目次
GKE を効率的に使うために
弊社の機械学習基盤では、ワークフロー管理ツールとして Cloud Composer (Airflow) を利用しており、機械学習タスクは別の Google Kubernetes Engine (以下、 GKE) クラスタ上で実行する構成を取っています。
GKE では複数の node pool を定義できるため、予め用途に応じた複数の node pool を用意しておくことで、タスクに応じた環境の割当てを容易に実現することができます。(現在は β版の提供に留まっていますが、GKE 側で利用リソースに応じて自動的に node pool の作成・削除を行ってくれる node-autoprovisioning も非常に有用です)
例えば、以下のような活用が考えられます。
- 機械学習モデルそれぞれに適した環境の割り当て(CPU重視、メモリ重視、GPU 利用有無、など)
- データセットのサイズに応じたスペックの変更
- preemptible インスタンスを利用して安価にハイスペックなマシンを利用
このように GKE を使用することによって柔軟な処理基盤を構築することができますが、 Cloud Composer と組み合わせる場合にはいくつか留意しておくべき点があります。
Kubernetes Executor
Airflow には Executor という概念があり、DAG 内で定義された各タスクをどのような方式で実行するか、制御方法を指定することができます。この Executor の 1 つとして Kubernetes Executor というものがあり、実行ノードの指定やボリュームのマウントなどの Kubernetes の機能を享受しながら、各タスクを別の Pod で独立に実行することができます。
しかし、最大の問題点として Cloud Composer は Celery Executor 以外の Executor を2020年2月現在時点ではサポートしていません。よって、この方法はそもそも利用できないということになります。
参考までに、Airflow で現在利用できるExecutor の一覧はこちらから確認できます。
KubernetesPodOperator (GKEPodOperator)
Kubernetes の Pod 上でタスクを実行する別の方法として、 KubernetesPodOperator があります。(GKE 専用の Operator として GKEPodOperator が別に存在していますが、機能は基本的に同一です)こちらは単一のタスクを Kubernetes の Pod 上で実行する Operator なので、 Celery Executor からでも利用できる点が前述の Kubernetes Executor と大きく異なります。
実際に GKEPodOperator を利用して Cloud Composer とは別の GKE クラスタで処理を行うワークフローを作成すると以下のようになります。ここでは詳しく紹介しませんが、 Kubernetes のマニフェスト (yaml) で書いていた内容が Python のコードに変わっただけなので、 Torelation や Volume, NodeSelector, Resource Requests/Limits などのパラメータも指定することが可能です。詳細はGCP の公式ページなどをご参照ください。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
project = 'my-project' location = 'us-central1-a' cluster = 'my-cluster' GKEPodOperator( task_id='gke_pod_operator_test', # Cloud Composer とは別のクラスタを使用するため False in_cluster=False, # 使用する GKE クラスタ の設定 cluster_context=f'gke_{project}_{location}_{cluster}', cluster_name=cluster, project_id=project, location=location, # pod のパラメータを指定 namespace='default', name='test-task', image=f'gcr.io/{project}/test:0.1', image_pull_policy='IfNotPresent', cmds=['pwd'], ) |
これで万事解決!と言いたいところでしたが、現実はそう甘くはありませんでした。
KubernetesPodOperator で実行されるタスクはタスクごとに独立した Pod 上で実行されますが、あくまで Pod であり Kubernetes の Job ではありません。そのため、リトライや並列実行などの機能は一切利用できず、リトライ管理や並列実行制御などを全て Cloud Composer 側で行う必要があります。もちろん Airflow が提供するタスクや DAG のリトライ機能は利用できますが、全てのジョブの実行状態を Cloud Composer 側で持つ場合、 DAG の並列数や同時実行タスク数の上限にもより気を配る必要性が出てきます。
また、 Pod の起動に失敗したタスクはエラーとされてしまうため、リソースが確保できるまでスケジューリングを待機する Kubernetes の仕組みとはあまり相性がよくありません。タスクを実行する GKE のリソースが逼迫している場合、タイムアウト時間を非常に長くとるか、リソースが空くまで監視し続ける必要があります。
以上の理由から、弊社の環境では別の策を考えることになりました。
BashOperator と kubectl コマンドの利用
Airflow 公式では Kubernetes の Job としてタスクを実行できる Operator を提供していないため、今回は愚直に BashOperator で kubectl を叩く方式を採用しました。
最適な実現方法ではないかもしれませんが、メンテナンスコストや実装の手間を考えると、お手軽に利用できるのが BashOperator の利点です。以下に Composer から別の GKE クラスタに対して Job の実行と終了待ちを行う Bash スクリプトの例を記載しておきます。この例では全てのパラメータを固定値としていますが、実用上は xcom や Operator 実行時のパラメータを埋め込むことで、より柔軟に処理内容を変更することができます。(ここでは取り上げませんが、 Airflow Sensor を使うことで、より柔軟な処理待ちの機構を実現できます。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
#!/usr/bin/env bash set -u -o pipefail # Composer 側でジョブをタイムアウトと判断させる秒数 JOBS_TIMEOUT=${JOBS_TIMEOUT:-3600} # ジョブの作成までのタイムアウト時間 CREATE_JOBS_TIMEOUT=${CREATE_JOBS_TIMEOUT:-60} # ジョブの終了確認を行うインターバル CHECK_INTERVAL=${CHECK_INTERVAL:-10} # 開始・終了時間の設定 readonly start_time=$(date +%s) readonly k8s_timeout_limit=$((start_time + CREATE_JOBS_TIMEOUT)) readonly end_time=$((start_time + JOBS_TIMEOUT)) # Job を実行させる GKE クラスタ readonly project_name="my-project" readonly zone="us-central1-a" readonly cluster_name="my-cluster" readonly gke_cluster="gke_${project_name}_${zone}_${cluster_name}" # k8s のマニフェストファイル readonly yaml_path="test-manifest.yaml" # 実行する Job の数 readonly num_jobs=10 # credential が未取得の場合は取得する if [ "$(kubectl config get-clusters | grep -c "${gke_cluster}")" -eq 0 ]; then gcloud config set container/use_application_default_credentials true gcloud container clusters get-credentials "${cluster_name}" \ --project "${project_name}" \ --zone "${zone}" fi # Job の作成が完了するまでループする while true; do kubectl create -f "${yaml_path}" --cluster "${gke_cluster}" if [ $? -eq 0 ]; then break else sleep "${CHECK_INTERVAL}" fi now=$(date +%s) if [ "${now}" -gt "${k8s_timeout_limit}" ]; then echo "ジョブが作成できませんでした。" exit 1 fi done # 全ての Job が終了するまでループする while true; do now=$(date +%s) if [ "${now}" -gt "${end_time}" ]; then echo "ジョブがタイムアウトしました。" exit 1 fi # kubectl wait では複数の Job を待機できないため、完了・失敗した Job の数をカウントする # 実行時の設定に合わせて selector なども指定する failed_jobs=$( kubectl get jobs -n default \ --cluster "${gke_cluster}" \ -o jsonpath='{range .items[?(@.status.conditions[0].type=="Failed")]}{.metadata.name}{"\n"}{end}' ) complete_jobs=$( kubectl get jobs -n default \ --cluster "${gke_cluster}" \ -o jsonpath='{range .items[?(@.status.conditions[0].type=="Complete")]}{.metadata.name}{"\n"}{end}' ) # 全ての Job 名に "test-task" と入っている前提 num_failed_jobs=$(echo "${failed_jobs}" | grep -c "test-task") num_complete_jobs=$(echo "${complete_jobs}" | grep -c "test-task") num_finished_jobs=$((num_failed_jobs + num_complete_jobs)) if [ "${num_finished_jobs}" -eq "${num_jobs}" ]; then break fi sleep "${CHECK_INTERVAL}" done |
Cloud Composer の今後への期待
Cloud Composer は Airflow へのバージョン追従が遅い点やランニングコストの面でまだまだ課題を感じる部分もありますが、マネージドサービスとして気軽に利用できるのが良いところだと感じています。
Kubernetes との連携を考えると Argo や Flyte など別のワークフローエンジンも候補に挙がるかとは思いますが、簡単に使い始められることや Dataflow など他の GCP 製品との連携が容易なことが Cloud Composer の利点だと思います。興味を持たれた方はぜひ試してみてください。
Masaaki Hirotsu
MLOps Div. 所属 / Kaggle Master
機械学習・データ分析基盤の構築に関わる事例や、クラウドを活用したアーキテクチャについて発信していきます。