目次
Airflow公式ドキュメントは難解。重要ポイントから先に読み解こう
前回、タスクスケジューラーの必要性とAirflowの概要やメリットについてご紹介しました。しかし、「とりあえず触ってみよう!」っと思ってもAirflow公式ドキュメントは英語、かつ専門的過ぎて難解かもしれません。たぶん、はじめてタスクスケジューラーを触る人にとっては、頭から順に読んでいっても分からないと思います。そのため「公式ドキュメントの読み解き方」について、私なりの考えをご紹介します。
とりあえずチュートリアルまでやろう
前回でもご説明しましたが、AirflowのタスクスケジューリングはPythonプログラミングで行いますが、Pythonプログラミングスキルが殆どなくてもできます。ただ、Bashシェルを順に呼ぶだけのバッチであれば、Pythonの条件式(if)やループ(for)などの記述は必要ありません。それを信じでクイックスタートからチュートリアルまでやってください。
ざっくり、チュートリアルのサンプルプログラムやっていることを説明すると、下記のように3つのタスクを作成して、最後にタスクの実行順を決めているだけです。
そして、タスク1のパラメタに注目してみると、「bash_command」にBashシェルコマンドを直書きしています。つまり、この部分を変えれば、Java言語などで別途開発した実行ファイルを実行できます。あとは、タスクの実行順を簡単なバッチ処理の完成です。
1 2 3 4 |
t1 = BashOperator( task_id='print_date', bash_command='date', dag=dag) |
タスクの動きはBaseOperatorのパラメタから理解する
上記で簡単なバッチ処理ができました。次はタスクエラー時のリトライ処理やメール通知です。これがプログラミングに不慣れだと混乱すると思います。
最初に結論からいうと、エラー時の動作などの各タスクの共通の設定は、纏めてパラメタ群(チュートリアルのdefault_args)で設定しています。そして、共通のパラメタとして何が設定できるかは、BaseOperatorファンクションのパラメタから読み取ってください。
なんでそんな作りになっているかは、BashOperatorなどのタスクファンクション(Airflowの説明ではオペレーターファンクション)などはBaseOperatorファンクションを土台に機能追加したものです。(構造的のは共通機能はBaseOperatorファンクションに纏められている)そのため、各タスクファンクション固有のパラメタは各タスクファンクションに説明がありますが、共通のパラメタについてはBaseOperatorファンクションにしかありません。
また、スケジューリング全体の設定はDAGファンクションのパラメタから設定してください。基本的に実行頻度(schedule_interval)パラメタ以外はチュートリアルの内容から変えなくても問題ないと思いますが、キャッチアップ(スケジュール開始時に過去の未完了分を実行)したくない場合は下記のように追記すれば回避できます。
1 2 |
dag = DAG('tutorial', default_args=default_args) dag.catchup = False #追加 |
最低限知っておきたいコマンド
AirflowはWebベースの管理コンソールから定数設定、ログの確認などはできますが、スケジュールのリセットや実行、管理コンソールの起動などの主要機能はコマンド(CLI)で行う必要があります。これらコマンドの説明も公式ドキュメントにあるのですが、これも非常に分かりにくいです。そのため、Airflowを動かすために最低限必要なコマンドについて簡単に説明します。
airflow initdb
Airflowインストール後に実行するコマンドです。これを実行することでAirflow内の管理情報やログが記録されているローカルデータベースを初期化します。同様に定義情報などを消す「airflow resetdb」がありますが、これらはAirflowの設定情報を初期化してしまうため、インストール直後以外は実行しない方が良いです。
airflow webserver
Webベースの管理コンソールを起動します。ただ、管理コンソールの起動だけですので、ジョブを実行する場合には下記の「airflow scheduler」を実行してください。また、Pythonプログラムによるジョブの変更結果はスケジュールとして反映されますが、管理コンソールの表示までは反映されないことがあります。そのため、ジョブの新規作成、ジョブ内のタスクの追加・変更・削除を行った場合は、管理コンソールの再起動(一度落として実行)をしてみてください。
また、初期状態のままだと管理コンソール内のリンク指定が正しくありませんので、Airflowのインストール先にある「airflow.cfg」ファイルの「base_url」や「endpoint_url」を書き換えてください。(送信メールサーバの設定値なども「airflow.cfg」にあります)
airflow scheduler
Airflow内の有効なジョブ(DAG)のスケジュールを実行します。作成したばかりのジョブは初期状態で一時停止状態(pause)になっています。そのため、下記のように管理コンソールで有効(On)にするか、「airflow unpause」コマンドを実行してください。
airflow list_dags
Airflow内の有効なDAG名をリスト表示します。コマンド実行結果に作成したDAG名がない場合はPythonプログラムに記述ミスでエラーになっている場合があります。その場合は「python hogehoge.py」のようにコマンド実行して、エラー個所を発見してください。
airflow list_tasks [DAG名] –tree
DAG内のタスク名をツリーリストで表示します。
airflow test [DAG名] [タスク名] YYYY-MM-DD
DAG内のタスクをテスト実行します。コマンド内の「YYYY-MM-DD」はテスト実行する時の日付になり、タスクの実行開始日(チュートリアルの場合は default_args 内の start_date)以降を指定します。テスト実行することで実際に処理が行われ、ログも記録されます。
癖は強いが武器になる!っと信じて!!
Airflowは非常に多機能でPythonプログラミングスキルはそれほど必要ありません。しかし、公式ドキュメントは凄さをアピールしすぎで、親切さが欠けているような気がします。個人的にはチュートリアルの後にAirflowの全体像の説明が欲しかったです。
今回ご説明したように「やること」を限定してしまえば、使うファンクションもコマンドもそれほど多くありません。そのため、脳みそが受入れ拒否しても我慢してチュートリアルをカスタマイズしながら勉強してください。そのうちAirflowの癖が理解できると思います。
次回は実際に運用中のジョブを開発して分かったタスク開発のポイントについてお伝えしたいと思います。
連載:Apache Airflow でタスクスケジューリングしてみた
- Airflowによって開発負荷が変わる
- 公式ドキュメントの読み解き方 (本編)
- タスク開発のポイント
- ログを退避させる