Apache AirFlowをDocker環境で構築して簡単なジョブを作る

仕事でApache AirFlowを使う機会がありましたので、調査がてらに、Dockerで環境を構築し、簡単なジョブを定義します。

AirFlow

AirFlowはジョブのスケジューリング・監視を、コード(主にPython)で定義・制御するためのプラットフォームです。ワークフロー、データフローの実装・運用に使われます。

github.com

類似のソフトウェアには、前回・前々回で紹介したLuigiや、Treasure DatasのDigDagなどがあります。

  • AirFlowではスケジューラも提供されているため、Luigiよりも広い範囲をカバーします
  • ワークフローを、AirFlowではPython、DigDagでは.dagファイルで定義します

Dockerで環境構築

ローカルに環境をインストールすることもできますが、Dockerでサクッと環境構築します。

以下のレポジトリからdocker-compose-CeleryExecutor.ymlをダウンロードしてきます。

GitHub - puckel/docker-airflow: Docker Apache Airflow

そして、docker-composeでビルド・起動まで行います。合計6個のコンテナが稼働していることが確認できます。

$ docker-compose -f docker-compose-CeleryExecutor.yml up -d

$ docker ps
CONTAINER ID        IMAGE                           COMMAND                  CREATED             STATUS                    PORTS                                        NAMES
da8e02c7fc22        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh work…"   26 seconds ago      Up 29 seconds             5555/tcp, 8080/tcp, 8793/tcp                 airflow_worker_1
73ebe8057b5a        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh sche…"   28 seconds ago      Up 30 seconds             5555/tcp, 8080/tcp, 8793/tcp                 airflow_scheduler_1
f66972f4f5d5        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh webs…"   29 seconds ago      Up 31 seconds (healthy)   5555/tcp, 8793/tcp, 0.0.0.0:8080->8080/tcp   airflow_webserver_1
1ac77f898545        puckel/docker-airflow:1.9.0-2   "/entrypoint.sh flow…"   29 seconds ago      Up 31 seconds             8080/tcp, 0.0.0.0:5555->5555/tcp, 8793/tcp   airflow_flower_1
99b6fce9e010        postgres:9.6                    "docker-entrypoint.s…"   30 seconds ago      Up 32 seconds             5432/tcp                                     airflow_postgres_1
7344396dc041        redis:3.2.7                     "docker-entrypoint.s…"   30 seconds ago      Up 32 seconds             6379/tcp                                     airflow_redis_1

ポート8080がAirFlowコンテナにフォワードされていますので、ブラウザからhttp://localhost:8080でアクセスすると、画面が表示されます。まだ何も定義していないので、空です。

f:id:ohke:20180414142554p:plain

DAGの定義と実行

それではDAGを定義していきます。

プロジェクトルートに作成されるdagsディレクトリ配下にPythonファイルを追加することで、DAGを定義します。
AirFlowでは、デフォルトで$AIRFLOW_HOME/dags以下にあるPythonファイルをDAGとして認識・実行するようになっています。設定ファイル$AIRFLOW_HOME/airflow.cfgで変更可能です。

最初のDAGとしてhello_airflow.pyを作成します。
起動したairflow_webserver_1コンテナはこのdagsディレクトリを$AIRFLOW_HOME/dagsにマウントされています。そのため、dagsディレクトリにファイルを追加すると、DAGとして認識されます。

.
├── dags
│   └── hello_airflow.py
└── docker-compose-CeleryExecutor.yml

hello_airflow.pyは以下のように定義します。基本的な流れは、DAGを作る、DAGと紐づくタスクを作る、タスク間に依存関係を定義するの3段階です。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


# DAGを作る
default_args = {
    'owner': 'ohke',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 21, 10, 0, 0),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('first_dag', default_args=default_args)

# DAGと紐づくタスクを作る
t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

# タスク間に依存関係を定義する
t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

DAGを作る

DAGクラスをインスタンス化することでDAGを作ります。

  • 最初の引数で、DAGのIDを設定
  • default_argsで、各種パラメータを設定
    • start_dateでDAGの実行開始日時、schedule_intervalで実行間隔(ここでは1日おき)
    • retriesでリトライ回数、retry_delayでリトライ間隔(ここでは5分おき)

[4/29 追記]
ややこしいのですが、最初のタスクの実行日時は、start_dateではなく、start_date+schedule_intervalとなります。
例えば、start_dateを4/22 0時、schedule_intervalをdaily(timedelta(days=1))に設定すると、最初にジョブが実行されるのは4/23 0時です 。
イメージとしては、start_dateで指定した日時からデータが蓄積され始め、schedule_intervalが経過したらdailyであれば1日分のデータを処理を行う、という感じです。

default_args = {
    'owner': 'ohke',
    'depends_on_past': False,
    'start_date': datetime(2018, 4, 22, 0, 0, 0),
    'schedule_interval': timedelta(days=1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('first_dag', default_args=default_args)

DAGと紐づくタスクを作る

ここではt1〜t4の4つのタスクを定義し、作成したdagと紐付けています。

AirFlowでは、典型的ないくつかのタスクのためにOperatorが定義されています。
今回は、シェルスクリプトを実行するBashOperatorを使っていますが、これ以外にもPythonOperatorPostgresOperatorなどが提供されています。

t1 = BashOperator(
    task_id='t1',
    bash_command='echo t1',
    dag=dag)

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

t3 = BashOperator(
    task_id='t3',
    bash_command='echo "{{ params.greeting }}"',
    params={'greeting': 'Hello, AirFlow!'},
    dag=dag)

t4 = BashOperator(
    task_id='t4',
    bash_command='echo t4', 
    dag=dag
)

タスク間に依存関係を定義する

Operatorクラスにはタスク間の依存関係を設定するメソッドが定義されています。

set_upstream()で待つタスクを設定できます。以下の例では、t1の後にt2とt3、t2・t3の後にt4が実行されます。
逆に、自身のタスク完了後に別のタスクを実行する関係はset_downstream()で定義できます。

AIrFlow 1.8以降では、メソッド名の代わりにビット演算子を使うこともでき、例えばt1 >> t2t2 << t1(いずれもt1→t2の順で実行)と記述できます。

t2.set_upstream(t1)
t3.set_upstream(t1)
t4.set_upstream([t2, t3])

DAGの実行

画面に戻ってみますと、新たに追加したfirst_dagが追加されていることが確認できます。

f:id:ohke:20180421093016p:plain

DAG名をクリックして依存関係のツリーも表示できます。t1の後にt2とt3が、t2とt3の後にt4がそれぞれ実行されることが可視化されています。

f:id:ohke:20180421093654p:plain

DAGを実行する場合、ToggleをOnにします。これで時間が来たら実行されます。LinksからTrigger Dagをクリックすると、DAGが即時実行されます。
DAG RunsにそのDAGの成功・失敗の回数、Recent Tasksには直近実行されたDAGの各タスクのステータスが表示されます。下図では、DAGが1回成功し、DAG内の4つのタスク(t1〜t4)も全て正常完了していることがわかります。

f:id:ohke:20180421100759p:plain

タスクに失敗した場合の動きも確認します。
タスクt2が失敗するように変更(exitコード0以外で返す)し、Trigger Dagで即時実行させます。

t2 = BashOperator(
    task_id='t2',
    bash_command='exit 1',
    retries=3,
    dag=dag)

すると、DAGは失敗(DAG Runsの右端の赤丸数字がFailedを表す)し、タスクは2つ成功(t1とt3)、1つ失敗(t2)、1つは依存するタスク失敗による未実行(t4)というステータスになっています。ツリーでさらにタスクごとの成功・失敗を確認することもできます。

まとめ

DockerでAirFlowの環境を構築し、簡単なジョブを作って実行させてみました。

今回は紹介できませんでしたが、Pythonコードを実行できるPythonOperatorや、特定の条件を満たすまで次のタスクの実行を待たせるための各種Sensorなどを組み合わせると、さらに柔軟かつ簡単に、複雑なフローを定義することもできるようになります。またの機会で触れていきます。