Python: Luigiの実行結果をCentral Schedulerで見やすくする

前回の投稿(Python: Luigiでデータパイプラインを作る - け日記)では、Pythonのデータパイプラインフレームワーク Luigi を使って依存関係のあるタスクを定義しました。

前回は1つ1つコマンドライン上で実行結果を見ていましたが、今回はCentral Schedulerを使って実行結果を見やすくします。

github.com

Central Scheduler

Central SchedulerはLuigi付属のサービスで、タスクの実行結果をWebブラウザから見ることができるようになります。

まず、luigidコマンドでサービスデーモンを起動します(Luigiがインストールされていることが前提です。pip install luigiでインストールできます)。

$ luigid --background --pidfile ./tmp/pidfile --logdir ./log --state-path ./tmp/state

--state-pathオプションで、luigidの最新状態を保存するファイルを指定します。
luigidを終了すると、--state-pathで指定したファイルに、その状態をpickleでシリアライズしてファイルに残しています。次回の起動時には、--state-pathのファイルをデシリアライズして読み込むことで、最新状態を復元しています。

luigidコマンドによりサービスが立ち上がり、localhost:8082でアクセスできるようになります(--portオプションで任意のポートを起動時に指定できます)。
ブラウザからhttp://localhost:8082にアクセスすると、以下のような管理画面が現れます。初期状態のため、実行結果はまだ表示されていません。

f:id:ohke:20180414103706p:plain

それでは、サービスを立ち上げた状態で、前回の投稿で作成したtasksを実行します。

main.pyの実装を示します。前回とほぼ同じですが、唯一異なるのがluigi.run()の引数から、--local-schedulerオプションを除いている点です。これにより、localhost:8082にアクセスしてタスクが実行されるようになります。

import luigi
import time


class Task1(luigi.Task):
    task_namespace = 'tasks'

    def run(self):
        print("Task1: run")
        with self.output().open("w") as target:
            target.write(f"This file was generated by Task1 at {time.asctime()}.")

    def output(self):
        print("Task1: output")
        return luigi.LocalTarget("intermediate/task1.txt")


class Task2(luigi.Task):
    task_namespace = 'tasks'

    def requires(self):
        print("Task2: requires")
        return Task1()

    def run(self):
        print("Task2: run")
        with self.input().open("r") as intermediate, self.output().open("w") as target:
            task1_text = intermediate.read()

            target.write(f"{task1_text}\n")
            target.write(f"This file was generated by Task2 at {time.asctime()}.")

    def output(self):
        print("Task2: output")
        return luigi.LocalTarget("output/task2.txt")


if __name__ == '__main__':
    #luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler'])
    luigi.run(['tasks.Task2', '--workers', '1'])

作成したmain.pyを$ python main.pyで実行した後、再度http://localhost:8202を見ると、tasks.Task1とtasks.Task2の実行結果が表示されていることがわかります。

f:id:ohke:20180414105731p:plain

Actionsのアイコンをクリックすると、タスクの依存関係もグラフ化して表示できます。

f:id:ohke:20180414110234p:plain

実行履歴の永続化

デフォルトでは、最新状態のみを持ち、ジョブの実行履歴は保持されません。

実行履歴をDBで永続化させるためには、設定ファイルluigi.cfgを以下のように編集する必要があります。
db_connectionで永続化に使うDBの接続文字列を設定します。ここでは、sqlite3のファイルを指定しています。DBを作っただけで、テーブルの作成等は行っていません。

[scheduler]
record_task_history = True

[task_history]
db_connection = sqlite:///./db/history.sqlite

luigidを再起動してmain.pyを実行した後、http://localhost:8082/historyにアクセスすると、タスクの実行履歴が表示されます。

f:id:ohke:20180414111125p:plain

各タスクをクリックすると、いつ実行開始され、いつ完了したのかなどの詳細な情報も閲覧できます。

作成したDBにアクセスすると、3つのテーブルが作成され、実行履歴が永続化されていることがわかります。

$ sqlite3 ./db/history.sqlite

sqlite> .table
task_events      task_parameters  tasks

sqlite> select * from tasks ;
1|tasks.Task2__99914b932b|tasks.Task2|XXXX
2|tasks.Task1__99914b932b|tasks.Task1|XXXX

sqlite> select * from task_events ;
1|1|PENDING|2018-04-14 10:10:11.890329
2|2|PENDING|2018-04-14 10:10:11.905884
3|2|RUNNING|2018-04-14 10:10:11.924232
4|2|DONE|2018-04-14 10:10:11.946267
5|1|RUNNING|2018-04-14 10:10:11.960480
6|1|DONE|2018-04-14 10:10:11.979264

まとめ

Luigiのタスクの実行結果をCentral Schedulerで見やすくしました。
Central Schedulerからタスクを起動するなどはできませんが、Luigiとセットで導入すれば、日々の実行時間の確認や問題発生時の原因調査などの役に立つかと思います。