Python: Luigiでデータパイプラインを作る
バッチ処理の実装にLuigiを使う機会があり、少し調べて整理しました。
irisデータセットをダウンロードしてきて、scikit-learnで学習したモデルをS3にアップロードする、簡単なサンプルも作ります。
Luigi
Luigiは、データパイプラインを記述するためのPythonフレームワークです。
特徴
- タスク間の依存関係を定型的に定義できる
- 失敗した(アウトプットが生成できなかった)タスクから再開できる
- スケジューリングはできない
- スタートキックは別の誰かが行う必要がある
ユースケース
処理間に依存関係があり、各処理が複雑な場合に役立ちます。
- 例えば、データの前処理やパラメータ最適化などを含む複雑な処理フローを構造化して実装したい
- 例えば、インプットとなるデータ量が膨大なので、適切に中間生成物を作りながら処理したい
一方で、ジョブのスケジューリングは別の機構(cronやAWS Data PipelineやAirFlowなど)が必要となります。
そうしたジョブスケジューラから呼び出され、依存関係があるまとまった処理を行うバッチ、という位置づけが良さそうです。
基本的な使い方
はじめにluigiをインストールします。
pip install luigi
最初に簡単な2つのタスクを定義して、動きを見ていきます。
- Task1: intermediate/task1.txtにファイルを出力
- Task2: intermediate/task2.txt(Task1の出力)を読み込み、output/task2.txtにファイル出力
フォルダ構造は以下のとおりです。main.pyが今回実装するプログラムです。python main.py
で実行できます。
. ├── Intermediate ├── main.py └── output
main.pyの実装を示します。
import luigi import time class Task1(luigi.Task): task_namespace = 'tasks' def run(self): print("Task1: run") with self.output().open("w") as target: target.write(f"This file was generated by Task1 at {time.asctime()}.") def output(self): print("Task1: output") return luigi.LocalTarget("intermediate/task1.txt") class Task2(luigi.Task): task_namespace = 'tasks' def requires(self): print("Task2: requires") return Task1() def run(self): print("Task2: run") with self.input().open("r") as intermediate, self.output().open("w") as target: task1_text = intermediate.read() target.write(f"{task1_text}\n") target.write(f"This file was generated by Task2 at {time.asctime()}.") def output(self): print("Task2: output") return luigi.LocalTarget("output/task2.txt") if __name__ == '__main__': luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler'])
Luigiでは、入力・処理・出力の一連の処理をTaskクラス(あるいはその継承クラス)を継承することで定義します。
Taskクラスでは、requires、output、runの3つのメソッドを主に実装します。
- requiresメソッドでは、前段のタスクを記述し、依存関係を定義する
- outputメソッドでは、出力先を定義する
- runメソッドでは、出力を生成する処理を記述する
- outputメソッドがFalse(存在しない)の場合のみ実行される
Task1では、依存するタスクが無いため、outputとrunのみを定義しています。
- outputは、
luigi.LocalTarget("intermediate/task1.txt")
でintermediate/task1.txtファイルを出力先に指定- LocalTarget以外にも、luigi.contrib.s3.S3Targetやluigi.contrib.bigquery.BigqueryTargetなどが提供されています (参考)
- runは、出力先をouputメソッドで取得し、文字列(
This file was generated by Task1 at Sat Mar 31 13:47:18 2018.
)を書き込んでいます
Task2では、requires、output、putを定義してます。
- requiresでは、依存するTask1を生成して返すことで、先にTask1を実行させるようにします
- runでは、Task1の出力をinputメソッドで取得しています
上を実行すると、task1.txtとtask2.txtが生成されます。
. ├── Intermediate │ └── task1.txt ├── input ├── main.py └── output └── task2.txt
コンソールの表示からは以下の順番で実行されていることがわかります。また、Luigi Execution Summaryからは2つのタスクが正常に完了したことも確認できます。
DEBUG: Checking if tasks.Task2() is complete Task2: output Task2: requires DEBUG: Checking if tasks.Task1() is complete Task1: output Task1: run Task1: output Task2: requires Task1: output Task2: run Task2: requires Task1: output Task2: output ... ===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 2 ran successfully: - 1 tasks.Task1() - 1 tasks.Task2() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
ここで、Task2に問題がありtask2.txtのみが出力されなかった場合を想定して、task2.txtのみを削除します。main.pyを再実行すると、Task2のみが実行されます。
===== Luigi Execution Summary ===== Scheduled 2 tasks of which: * 1 present dependencies were encountered: - 1 tasks.Task1() * 1 ran successfully: - 1 tasks.Task2() This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
一方で、task1.txtのみを削除して再実行しても、task2.txtは存在するため何も行われません(最初から再作成されたりしません)。
===== Luigi Execution Summary ===== Scheduled 1 tasks of which: * 1 present dependencies were encountered: - 1 tasks.Task2() Did not run any tasks This progress looks :) because there were no failed tasks or missing external dependencies ===== Luigi Execution Summary =====
機械学習やS3と組み合わせたサンプル
応用例として、Irisデータセットを使って機械学習した結果をS3にアップロードする3つのタスクを定義します。
- iris_tasks.DownloadCsvTask: Irisデータセット(CSVファイル)をダウンロードして、intermediate/original.csvに出力する
- iris_tasks.CreateModelTask: original.csvを読み込み、SVMで機械学習してモデルを作り、intermediate/model.pklに出力する
- iris_tasks.UploadS3Task: model.pklを読み込み、S3にアップロードする
設定ファイルはフォルダ構成は以下のとおりです。
. ├── intermediate ├── luigi.cfg ├── main.py └── output
新たにluigi.cfg
という設定ファイルを追加しています。
Luigiでは、実行時のカレントパスに"luigi.cfg"を配置するか、または、"LUIGI_CONFIG_PATH"でファイルパスを指定することで、実行時に参照できるパラメータを設定できます。
luigi.cfgは以下のように記述しています。
タスクの名前(例えば[iris_tasks.DownloadCsvTask]
)を指定し、パラメータ(iris_tasks.DownloadCsvTaskの場合は、source_url
とintermediate_original_path
)を渡します。
[s3]
はS3のアップロードで必要な認証情報を渡しています。このあと触れるS3Targetでは、[s3]
で渡された認証情報を使ってS3にアクセスします。
[iris_tasks.DownloadCsvTask] source_url=https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data intermediate_original_path=intermediate/original.csv [iris_tasks.CreateModelTask] intermediate_model_path=intermediate/model.pkl [iris_tasks.UploadS3Task] s3_model_path=s3://test-s3-path/model.pkl [s3] aws_access_key_id=XXXXXXXXXXXXXXXXXXXX aws_secret_access_key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
それでは実装を示します。こちらもpython main.py
で実行できます。
import luigi from luigi.contrib.s3 import S3Target # pip install boto import requests import pandas import pickle import shutil from sklearn import svm # irisデータをダウンロードしてCSVファイルに出力 class DownloadCsvTask(luigi.Task): task_namespace = 'iris_tasks' source_url = luigi.Parameter() intermediate_original_path = luigi.Parameter() def run(self): with self.output().open("w") as target: response = requests.get(self.source_url) target.write(response.text) def output(self): return luigi.LocalTarget(self.intermediate_original_path) # irisの分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力 class CreateModelTask(luigi.Task): task_namespace = 'iris_tasks' intermediate_model_path = luigi.Parameter() def requires(self): return DownloadCsvTask() def run(self): with open(self.output().path, "wb") as target: # self.output().open("wb") ではエラーが発生した # https://github.com/spotify/luigi/issues/1647 data = pandas.read_csv(self.input().path, header=None) x = data.iloc[:, [0, 1, 2, 3]] y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2}) model = svm.SVC() model.fit(x, y) pickle.dump(model, target) def output(self): # バイナリファイルを出力する場合は、formatにNopFormatを指定 return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat) # pklファイルをS3のバケットにアップロード class UploadS3Task(luigi.Task): task_namespace = 'iris_tasks' s3_model_path = luigi.Parameter() def requires(self): return CreateModelTask() def run(self): with self.output().open("w") as target: # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー shutil.copy(self.input().path, target.tmp_path) def output(self): # S3Targetでアップロードできる(botoのインストールが必要) return S3Target(self.s3_model_path) if __name__ == '__main__': luigi.run(['iris_tasks.UploadS3Task', '--workers', '1', '--local-scheduler'])
最初のDownloadCsvTaskでは、irisのデータをダウンロードしてCSVファイルでローカルに保存します。
luigi.cfgで渡されたパラメータですが、変数名 = luigi.Parameter()
とすることで、変数名と一致するパラメータの値が得られます。
ここでは、[iris_tasks.DownloadCsvTask]
からsource_url
とintermediate_original_path
の値を取り出しています。
class DownloadCsvTask(luigi.Task): task_namespace = 'iris_tasks' source_url = luigi.Parameter() intermediate_original_path = luigi.Parameter() def run(self): with self.output().open("w") as target: response = requests.get(self.source_url) target.write(response.text) def output(self): return luigi.LocalTarget(self.intermediate_original_path)
2番目にCreateModelTaskが実行されます。前段のDownloadCsvTaskからoriginal.csvを受け取り、分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力しています。
これまでと同様LocalTargetを使っていますが、バイナリで出力する場合は、formatオプションにNorFormatを指定します。
ちなみに、runメソッドでファイルをオープンするときは、open(self.output().path, "wb")
としてます(バージョンの問題なのか self.output().open("wb")
ではだめでした)。
class CreateModelTask(luigi.Task): task_namespace = 'iris_tasks' intermediate_model_path = luigi.Parameter() def requires(self): return DownloadCsvTask() def run(self): with open(self.output().path, "wb") as target: # self.output().open("wb") ではエラーが発生した # https://github.com/spotify/luigi/issues/1647 data = pandas.read_csv(self.input().path, header=None) x = data.iloc[:, [0, 1, 2, 3]] y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2}) model = svm.SVC() model.fit(x, y) pickle.dump(model, target) def output(self): # バイナリファイルを出力する場合は、formatにNopFormatを指定 return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat)
最後のUploadS3Taskでは、作成したpklファイルをS3のパスにアップロードします。
S3に出力する場合は、S3Targetを使うと自然に出力できます。
botoも事前にインストールしておく必要があります(内部的にはS3Clientでアクセスしています)。また、luigi.cfgの[s3]
で渡したaws_access_key_id
とaws_secret_access_key
の値が認証情報として使われます。
class UploadS3Task(luigi.Task): task_namespace = 'iris_tasks' s3_model_path = luigi.Parameter() def requires(self): return CreateModelTask() def run(self): with self.output().open("w") as target: # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー shutil.copy(self.input().path, target.tmp_path) def output(self): # S3Targetでアップロードできる(botoのインストールが必要) return S3Target(self.s3_model_path)
まとめ
ジョブスケジューラを使うまででもないけど、データの受け渡しを伴う処理だったり、時間がかかる処理を分けたい場合に使えそうですね。