仕事でApache AirFlowを使う機会がありましたので、調査がてらに、Dockerで環境を構築し、簡単なジョブを定義します。
AirFlow
AirFlowはジョブのスケジューリング・監視を、コード(主にPython)で定義・制御するためのプラットフォームです。ワークフロー、データフローの実装・運用に使われます。
github.com
類似のソフトウェアには、前回・前々回で紹介したLuigiや、Treasure DatasのDigDagなどがあります。
- AirFlowではスケジューラも提供されているため、Luigiよりも広い範囲をカバーします
- ワークフローを、AirFlowではPython、DigDagでは.dagファイルで定義します
Dockerで環境構築
ローカルに環境をインストールすることもできますが、Dockerでサクッと環境構築します。
以下のレポジトリからdocker-compose-CeleryExecutor.ymlをダウンロードしてきます。
GitHub - puckel/docker-airflow: Docker Apache Airflow
そして、docker-composeでビルド・起動まで行います。合計6個のコンテナが稼働していることが確認できます。
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
da8e02c7fc22 puckel/docker-airflow:1.9.0-2 "/entrypoint.sh work…" 26 seconds ago Up 29 seconds 5555/tcp, 8080/tcp, 8793/tcp airflow_worker_1
73ebe8057b5a puckel/docker-airflow:1.9.0-2 "/entrypoint.sh sche…" 28 seconds ago Up 30 seconds 5555/tcp, 8080/tcp, 8793/tcp airflow_scheduler_1
f66972f4f5d5 puckel/docker-airflow:1.9.0-2 "/entrypoint.sh webs…" 29 seconds ago Up 31 seconds (healthy) 5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp airflow_webserver_1
1ac77f898545 puckel/docker-airflow:1.9.0-2 "/entrypoint.sh flow…" 29 seconds ago Up 31 seconds 8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp airflow_flower_1
99b6fce9e010 postgres:9.6 "docker-entrypoint.s…" 30 seconds ago Up 32 seconds 5432/tcp airflow_postgres_1
7344396dc041 redis:3.2.7 "docker-entrypoint.s…" 30 seconds ago Up 32 seconds 6379/tcp airflow_redis_1
ポート8080がAirFlowコンテナにフォワードされていますので、ブラウザからhttp://localhost:8080
でアクセスすると、画面が表示されます。まだ何も定義していないので、空です。
DAGの定義と実行
それではDAGを定義していきます。
プロジェクトルートに作成されるdagsディレクトリ配下にPythonファイルを追加することで、DAGを定義します。
AirFlowでは、デフォルトで$AIRFLOW_HOME/dags
以下にあるPythonファイルをDAGとして認識・実行するようになっています。設定ファイル$AIRFLOW_HOME/airflow.cfg
で変更可能です。
最初のDAGとしてhello_airflow.py
を作成します。
起動したairflow_webserver_1コンテナはこのdagsディレクトリを$AIRFLOW_HOME/dags
にマウントされています。そのため、dagsディレクトリにファイルを追加すると、DAGとして認識されます。
.
├── dags
│ └── hello_airflow.py
└── docker-compose-CeleryExecutor.yml
hello_airflow.py
は以下のように定義します。基本的な流れは、DAGを作る、DAGと紐づくタスクを作る、タスク間に依存関係を定義するの3段階です。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'ohke',
'depends_on_past': False,
'start_date': datetime(2018, 4, 21, 10, 0, 0),
'schedule_interval': timedelta(days=1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('first_dag', default_args=default_args)
t1 = BashOperator(
task_id='t1',
bash_command='echo t1',
dag=dag)
t2 = BashOperator(
task_id='t2',
bash_command='exit 1',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='t3',
bash_command='echo "{{ params.greeting }}"',
params={'greeting': 'Hello, AirFlow!'},
dag=dag)
t4 = BashOperator(
task_id='t4',
bash_command='echo t4',
dag=dag
)
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
DAGを作る
DAGクラスをインスタンス化することでDAGを作ります。
- 最初の引数で、DAGのIDを設定
default_args
で、各種パラメータを設定
start_date
でDAGの実行開始日時、schedule_interval
で実行間隔(ここでは1日おき)
retries
でリトライ回数、retry_delay
でリトライ間隔(ここでは5分おき)
[4/29 追記]
ややこしいのですが、最初のタスクの実行日時は、start_date
ではなく、start_date+schedule_interval
となります。
例えば、start_date
を4/22 0時、schedule_interval
をdaily(timedelta(days=1)
)に設定すると、最初にジョブが実行されるのは4/23 0時です 。
イメージとしては、start_date
で指定した日時からデータが蓄積され始め、schedule_interval
が経過したらdailyであれば1日分のデータを処理を行う、という感じです。
default_args = {
'owner': 'ohke',
'depends_on_past': False,
'start_date': datetime(2018, 4, 22, 0, 0, 0),
'schedule_interval': timedelta(days=1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('first_dag', default_args=default_args)
DAGと紐づくタスクを作る
ここではt1〜t4の4つのタスクを定義し、作成したdagと紐付けています。
AirFlowでは、典型的ないくつかのタスクのためにOperatorが定義されています。
今回は、シェルスクリプトを実行するBashOperatorを使っていますが、これ以外にもPythonOperatorやPostgresOperatorなどが提供されています。
t1 = BashOperator(
task_id='t1',
bash_command='echo t1',
dag=dag)
t2 = BashOperator(
task_id='t2',
bash_command='exit 1',
retries=3,
dag=dag)
t3 = BashOperator(
task_id='t3',
bash_command='echo "{{ params.greeting }}"',
params={'greeting': 'Hello, AirFlow!'},
dag=dag)
t4 = BashOperator(
task_id='t4',
bash_command='echo t4',
dag=dag
)
タスク間に依存関係を定義する
Operatorクラスにはタスク間の依存関係を設定するメソッドが定義されています。
set_upstream()で待つタスクを設定できます。以下の例では、t1の後にt2とt3、t2・t3の後にt4が実行されます。
逆に、自身のタスク完了後に別のタスクを実行する関係はset_downstream()で定義できます。
AIrFlow 1.8以降では、メソッド名の代わりにビット演算子を使うこともでき、例えばt1 >> t2
やt2 << t1
(いずれもt1→t2の順で実行)と記述できます。
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])
DAGの実行
画面に戻ってみますと、新たに追加したfirst_dag
が追加されていることが確認できます。
DAG名をクリックして依存関係のツリーも表示できます。t1の後にt2とt3が、t2とt3の後にt4がそれぞれ実行されることが可視化されています。
DAGを実行する場合、ToggleをOnにします。これで時間が来たら実行されます。LinksからTrigger Dagをクリックすると、DAGが即時実行されます。
DAG RunsにそのDAGの成功・失敗の回数、Recent Tasksには直近実行されたDAGの各タスクのステータスが表示されます。下図では、DAGが1回成功し、DAG内の4つのタスク(t1〜t4)も全て正常完了していることがわかります。
タスクに失敗した場合の動きも確認します。
タスクt2が失敗するように変更(exitコード0以外で返す)し、Trigger Dagで即時実行させます。
t2 = BashOperator(
task_id='t2',
bash_command='exit 1',
retries=3,
dag=dag)
すると、DAGは失敗(DAG Runsの右端の赤丸数字がFailedを表す)し、タスクは2つ成功(t1とt3)、1つ失敗(t2)、1つは依存するタスク失敗による未実行(t4)というステータスになっています。ツリーでさらにタスクごとの成功・失敗を確認することもできます。
まとめ
DockerでAirFlowの環境を構築し、簡単なジョブを作って実行させてみました。
今回は紹介できませんでしたが、Pythonコードを実行できるPythonOperatorや、特定の条件を満たすまで次のタスクの実行を待たせるための各種Sensorなどを組み合わせると、さらに柔軟かつ簡単に、複雑なフローを定義することもできるようになります。またの機会で触れていきます。