Apache Airflow でタスクスケジューリングしてみた ~タスク開発のポイント~
- TAG : Airflow | Tech & Science
- POSTED : 2018.02.07 14:56
f t p h l
目次
Airflowのタスク処理はPythonで開発すると便利
前回までは簡単なジョブ処理の作成・実行まででしたが、今回はちょっと応用編です。そのため、Pythonの基礎的なスキルが必要になりますが、Pythonを使うことで「やれること」も多くなり、タスクの開発手法も大きく変わります。なるべく、Python未経験者でもメリットが分かるように書いたつもりですので読んでいただければと思います。
タスク(オペレーター)の種類を整理しよう
Airflowのタスクはオペレーターファンクションによって作成することができます。オペレーターファンクションは50個以上あり、日々、有識者によって追加・改善が行われています。若干、公式ドキュメントが古いため、GitHubも合わせて見てください。
オペレーターには大きく分けて2種類あり、BaseOperatorファンクションをベースとした処理を行うオペレーターと、BaseSensorOperatorファンクションをベースとしたセンサーオペレーターがあります。センサーオペレーターは「ストレージ内にファイルが作成された」などの挙動を検知して、次のタスクを実行するトリガー的なファンクションです。今回はセンサーオペレーター以外について説明します。
Bashシェルによるタスク処理
チュートリアルでも使われているBashOperatorファンクションによってBashシェルを実行できます。Bashシェルをそのまま使えますので、簡単なプログラミングやC言語などで別途開発したタスクファイルを実行することが可能です。
Pythonプログラミングによるタスク処理
PythonOperatorファンクションによってPython命令を実行することができます。(詳しい説明は次の章で)
SQL命令によるタスク処理
PostgresOperatorファンクションなどのよってSQL命令を実行することができます。PostgreSQLの他にORACLE、MySQL、SQLiteなどに対応し、それぞれファンクションが異なります。(詳しくはGitHub)これらデータベースの接続情報は、Airflow内のconnectionsを使用しているため、Webの管理コンソール、または、コマンドラインを使って、追加・変更を行ってください。
特定機能の処理を行うタスク処理
オペレーターファンクションの中には上記のような汎用的なファンクション以外にHiveなどの特定の機能の処理を行うファンクションも存在します。特にGoogle Cloud Platform関係のファンクションが多く、BigQuery、Google Cloud Storage などに対応しています。これらのオペレーターファンクションは公式ドキュメントには情報がない物が多いため、GitHubのプログラムソースを見てください。
タスク処理をPythonで開発するとファンクション呼び出しができる
Airflowのオペレーターファンクションを使えば、データファイルのデータベースへのインポートからSQL命令によるクレンジングや集計が行えるため、単純なELT処理ならノンプログラミングで作れます。しかし、データファイルなどの内容によって実行するタスクが変わったり、繰り返し処理が多い場合はオペレーターファンクションのみでの実装に限界があります。そこで別途プログラミングによるタスク処理開発が必要になるのですが、私はPythonプログラミングによるタスク処理開発をお勧めします。
BashOperatorファンクションを使えばタスクファイルを実行できるのですが、実行プロセスが分かれているため、タスクファイル内での実行エラー時に原因特定が難しいです。また、タスクファイルにパラメタを渡す場合も制約が多く、スマートなやり方ではありません。
そこでPythonOperatorファンクションを使用して、タスク処理をファンクション呼び出しします。タスク処理をAirflowの本体(DAGファイル)と分けてPythonファイルを作成し、別途開発・テストを行います。そして、下記のようにPythonOperatorファンクションのpython_callableパラメタにタスク処理のファンクション(下記の場合はfunc_hogehoge)を記述すれば、タスク処理をファンクション呼び出しできます。
1 2 3 4 |
t1 = PythonOperator( task_id = 'task1', python_callable = func_hogehoge, dag = dag) |
タスク処理をファンクション呼び出しにすることによって、実行プロセスが分かれず、エラー時のデバックが行いやすいです。また、タスク処理ファンクションへのパラメタもシンプルになり、工夫次第では例外処理も組み込むことが可能です。
環境に依存する情報はAirflowのVariableで管理する
開発環境、検証環境、そして本番環境など開発工程ではそれぞれ実行環境が用意され、それらのデータベースやファイルの入出力先などの情報は実行環境によって異なります。これら実行環境に依存する情報はコンテキスト情報としてXMLなどの別ファイルに退避させて、環境ごとに持たせることが一般的です。
Airflowの場合、このコンテキスト情報の代用ができるのが「Variable」になります。VariableはWebの管理コンソール、または、コマンドラインを使って、追加・変更が可能です。
このVariable機能を使用し、下記のようにAirflowの本体(DAGファイル)からコンテキストのPythonファイルの情報を書き換えることで、全くタスク処理本体であるPythonファイルを編集することなく、Airflow側の設定だけで開発環境→検証環境→本番環境のように実行環境のステージング時の設定が可能です。(Variable情報の呼び出しは Variable.get(‘[Variableキー]’))また、Airflowの実行環境をDockerイメージなどに固めておけば、実行環境の複製も非常に楽です。
PythonOperatorファンクションで開発分業
AirflowのPythonOperatorファンクションを使うことでPythonで開発したタスク処理ファンクションを簡単に呼び出せるようになります。また、Variableを使用することで環境に依存する情報をAirflowの設定で管理・運用することが可能です。この様にAirflowのジョブとタスク処理本体を独立させることで、ジョブ運用者とタスク処理開発者で分業でき、インターフェースさえ意識を合わせておけば各々の作業範囲に集中することができます。
現在、弊社ではAirflowを本番環境で運用中です。まだ運用開始して間もありませんが、暫くして運用ノウハウが溜まりましたら運用トピックをお伝えできればと思います。
連載:Apache Airflow でタスクスケジューリングしてみた
- Airflowによって開発負荷が変わる
- 公式ドキュメントの読み解き方
- タスク開発のポイント(本編)
- ログを退避させる
f t p h l