前回の投稿(Python: Luigiでデータパイプラインを作る - け日記)では、Pythonのデータパイプラインフレームワーク Luigi を使って依存関係のあるタスクを定義しました。
前回は1つ1つコマンドライン上で実行結果を見ていましたが、今回はCentral Schedulerを使って実行結果を見やすくします。
Central Scheduler
Central SchedulerはLuigi付属のサービスで、タスクの実行結果をWebブラウザから見ることができるようになります。
まず、luigidコマンドでサービスデーモンを起動します(Luigiがインストールされていることが前提です。pip install luigi
でインストールできます)。
$ luigid --background --pidfile ./tmp/pidfile --logdir ./log --state-path ./tmp/state
--state-path
オプションで、luigidの最新状態を保存するファイルを指定します。
luigidを終了すると、--state-path
で指定したファイルに、その状態をpickleでシリアライズしてファイルに残しています。次回の起動時には、--state-path
のファイルをデシリアライズして読み込むことで、最新状態を復元しています。
luigidコマンドによりサービスが立ち上がり、localhost:8082
でアクセスできるようになります(--port
オプションで任意のポートを起動時に指定できます)。
ブラウザからhttp://localhost:8082
にアクセスすると、以下のような管理画面が現れます。初期状態のため、実行結果はまだ表示されていません。
それでは、サービスを立ち上げた状態で、前回の投稿で作成したtasksを実行します。
main.pyの実装を示します。前回とほぼ同じですが、唯一異なるのがluigi.run()
の引数から、--local-scheduler
オプションを除いている点です。これにより、localhost:8082
にアクセスしてタスクが実行されるようになります。
import luigi import time class Task1(luigi.Task): task_namespace = 'tasks' def run(self): print("Task1: run") with self.output().open("w") as target: target.write(f"This file was generated by Task1 at {time.asctime()}.") def output(self): print("Task1: output") return luigi.LocalTarget("intermediate/task1.txt") class Task2(luigi.Task): task_namespace = 'tasks' def requires(self): print("Task2: requires") return Task1() def run(self): print("Task2: run") with self.input().open("r") as intermediate, self.output().open("w") as target: task1_text = intermediate.read() target.write(f"{task1_text}\n") target.write(f"This file was generated by Task2 at {time.asctime()}.") def output(self): print("Task2: output") return luigi.LocalTarget("output/task2.txt") if __name__ == '__main__': #luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler']) luigi.run(['tasks.Task2', '--workers', '1'])
作成したmain.pyを$ python main.py
で実行した後、再度http://localhost:8202
を見ると、tasks.Task1とtasks.Task2の実行結果が表示されていることがわかります。
Actionsのアイコンをクリックすると、タスクの依存関係もグラフ化して表示できます。
実行履歴の永続化
デフォルトでは、最新状態のみを持ち、ジョブの実行履歴は保持されません。
実行履歴をDBで永続化させるためには、設定ファイルluigi.cfgを以下のように編集する必要があります。
db_connection
で永続化に使うDBの接続文字列を設定します。ここでは、sqlite3のファイルを指定しています。DBを作っただけで、テーブルの作成等は行っていません。
[scheduler] record_task_history = True [task_history] db_connection = sqlite:///./db/history.sqlite
luigidを再起動してmain.pyを実行した後、http://localhost:8082/history
にアクセスすると、タスクの実行履歴が表示されます。
各タスクをクリックすると、いつ実行開始され、いつ完了したのかなどの詳細な情報も閲覧できます。
作成したDBにアクセスすると、3つのテーブルが作成され、実行履歴が永続化されていることがわかります。
$ sqlite3 ./db/history.sqlite sqlite> .table task_events task_parameters tasks sqlite> select * from tasks ; 1|tasks.Task2__99914b932b|tasks.Task2|XXXX 2|tasks.Task1__99914b932b|tasks.Task1|XXXX sqlite> select * from task_events ; 1|1|PENDING|2018-04-14 10:10:11.890329 2|2|PENDING|2018-04-14 10:10:11.905884 3|2|RUNNING|2018-04-14 10:10:11.924232 4|2|DONE|2018-04-14 10:10:11.946267 5|1|RUNNING|2018-04-14 10:10:11.960480 6|1|DONE|2018-04-14 10:10:11.979264
まとめ
Luigiのタスクの実行結果をCentral Schedulerで見やすくしました。
Central Schedulerからタスクを起動するなどはできませんが、Luigiとセットで導入すれば、日々の実行時間の確認や問題発生時の原因調査などの役に立つかと思います。