Python: Luigiでデータパイプラインを作る

バッチ処理の実装にLuigiを使う機会があり、少し調べて整理しました。
irisデータセットをダウンロードしてきて、scikit-learnで学習したモデルをS3にアップロードする、簡単なサンプルも作ります。

Luigi

Luigiは、データパイプラインを記述するためのPythonフレームワークです。

github.com

特徴

  • タスク間の依存関係を定型的に定義できる
  • 失敗した(アウトプットが生成できなかった)タスクから再開できる
  • スケジューリングはできない
    • スタートキックは別の誰かが行う必要がある

ユースケース

処理間に依存関係があり、各処理が複雑な場合に役立ちます。

  • 例えば、データの前処理やパラメータ最適化などを含む複雑な処理フローを構造化して実装したい
  • 例えば、インプットとなるデータ量が膨大なので、適切に中間生成物を作りながら処理したい

一方で、ジョブのスケジューリングは別の機構(cronやAWS Data PipelineやAirFlowなど)が必要となります。

そうしたジョブスケジューラから呼び出され、依存関係があるまとまった処理を行うバッチ、という位置づけが良さそうです。

基本的な使い方

はじめにluigiをインストールします。

pip install luigi

最初に簡単な2つのタスクを定義して、動きを見ていきます。

  1. Task1: intermediate/task1.txtにファイルを出力
  2. Task2: intermediate/task2.txt(Task1の出力)を読み込み、output/task2.txtにファイル出力

フォルダ構造は以下のとおりです。main.pyが今回実装するプログラムです。python main.pyで実行できます。

.
├── Intermediate
├── main.py
└── output

main.pyの実装を示します。

import luigi
import time


class Task1(luigi.Task):
    task_namespace = 'tasks'

    def run(self):
        print("Task1: run")
        with self.output().open("w") as target:
            target.write(f"This file was generated by Task1 at {time.asctime()}.")

    def output(self):
        print("Task1: output")
        return luigi.LocalTarget("intermediate/task1.txt")


class Task2(luigi.Task):
    task_namespace = 'tasks'

    def requires(self):
        print("Task2: requires")
        return Task1()

    def run(self):
        print("Task2: run")
        with self.input().open("r") as intermediate, self.output().open("w") as target:
            task1_text = intermediate.read()

            target.write(f"{task1_text}\n")
            target.write(f"This file was generated by Task2 at {time.asctime()}.")

    def output(self):
        print("Task2: output")
        return luigi.LocalTarget("output/task2.txt")


if __name__ == '__main__':
    luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler'])

Luigiでは、入力・処理・出力の一連の処理をTaskクラス(あるいはその継承クラス)を継承することで定義します。
Taskクラスでは、requires、output、runの3つのメソッドを主に実装します。

  • requiresメソッドでは、前段のタスクを記述し、依存関係を定義する
  • outputメソッドでは、出力先を定義する
  • runメソッドでは、出力を生成する処理を記述する
    • outputメソッドがFalse(存在しない)の場合のみ実行される

Task1では、依存するタスクが無いため、outputとrunのみを定義しています。

  • outputは、luigi.LocalTarget("intermediate/task1.txt")でintermediate/task1.txtファイルを出力先に指定
    • LocalTarget以外にも、luigi.contrib.s3.S3Targetやluigi.contrib.bigquery.BigqueryTargetなどが提供されています (参考)
  • runは、出力先をouputメソッドで取得し、文字列(This file was generated by Task1 at Sat Mar 31 13:47:18 2018.)を書き込んでいます

Task2では、requires、output、putを定義してます。

  • requiresでは、依存するTask1を生成して返すことで、先にTask1を実行させるようにします
  • runでは、Task1の出力をinputメソッドで取得しています

上を実行すると、task1.txtとtask2.txtが生成されます。

.
├── Intermediate
│   └── task1.txt
├── input
├── main.py
└── output
    └── task2.txt

コンソールの表示からは以下の順番で実行されていることがわかります。また、Luigi Execution Summaryからは2つのタスクが正常に完了したことも確認できます。

DEBUG: Checking if tasks.Task2() is complete
Task2: output
Task2: requires
DEBUG: Checking if tasks.Task1() is complete
Task1: output
Task1: run
Task1: output
Task2: requires
Task1: output
Task2: run
Task2: requires
Task1: output
Task2: output
...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 tasks.Task1()
    - 1 tasks.Task2()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ここで、Task2に問題がありtask2.txtのみが出力されなかった場合を想定して、task2.txtのみを削除します。main.pyを再実行すると、Task2のみが実行されます。

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 tasks.Task1()
* 1 ran successfully:
    - 1 tasks.Task2()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

一方で、task1.txtのみを削除して再実行しても、task2.txtは存在するため何も行われません(最初から再作成されたりしません)。

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 tasks.Task2()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

機械学習やS3と組み合わせたサンプル

応用例として、Irisデータセットを使って機械学習した結果をS3にアップロードする3つのタスクを定義します。

  1. iris_tasks.DownloadCsvTask: Irisデータセット(CSVファイル)をダウンロードして、intermediate/original.csvに出力する
  2. iris_tasks.CreateModelTask: original.csvを読み込み、SVMで機械学習してモデルを作り、intermediate/model.pklに出力する
  3. iris_tasks.UploadS3Task: model.pklを読み込み、S3にアップロードする

設定ファイルはフォルダ構成は以下のとおりです。

.
├── intermediate
├── luigi.cfg
├── main.py
└── output

新たにluigi.cfgという設定ファイルを追加しています。
Luigiでは、実行時のカレントパスに"luigi.cfg"を配置するか、または、"LUIGI_CONFIG_PATH"でファイルパスを指定することで、実行時に参照できるパラメータを設定できます。

luigi.cfgは以下のように記述しています。
タスクの名前(例えば[iris_tasks.DownloadCsvTask])を指定し、パラメータ(iris_tasks.DownloadCsvTaskの場合は、source_urlintermediate_original_path)を渡します。
[s3]はS3のアップロードで必要な認証情報を渡しています。このあと触れるS3Targetでは、[s3]で渡された認証情報を使ってS3にアクセスします。

[iris_tasks.DownloadCsvTask]
source_url=https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
intermediate_original_path=intermediate/original.csv

[iris_tasks.CreateModelTask]
intermediate_model_path=intermediate/model.pkl

[iris_tasks.UploadS3Task]
s3_model_path=s3://test-s3-path/model.pkl

[s3]
aws_access_key_id=XXXXXXXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

それでは実装を示します。こちらもpython main.pyで実行できます。

import luigi
from luigi.contrib.s3 import S3Target # pip install boto
import requests
import pandas
import pickle
import shutil
from sklearn import svm


# irisデータをダウンロードしてCSVファイルに出力
class DownloadCsvTask(luigi.Task):
    task_namespace = 'iris_tasks'

    source_url = luigi.Parameter()
    intermediate_original_path = luigi.Parameter()

    def run(self):
        with self.output().open("w") as target:
            response = requests.get(self.source_url)
            target.write(response.text)

    def output(self):
        return luigi.LocalTarget(self.intermediate_original_path)


# irisの分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力
class CreateModelTask(luigi.Task):
    task_namespace = 'iris_tasks'

    intermediate_model_path = luigi.Parameter()

    def requires(self):
        return DownloadCsvTask()

    def run(self):
        with open(self.output().path, "wb") as target:
            # self.output().open("wb") ではエラーが発生した
            # https://github.com/spotify/luigi/issues/1647

            data = pandas.read_csv(self.input().path, header=None)
            x = data.iloc[:, [0, 1, 2, 3]]
            y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2})

            model = svm.SVC()
            model.fit(x, y)

            pickle.dump(model, target)

    def output(self):
        # バイナリファイルを出力する場合は、formatにNopFormatを指定
        return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat)


# pklファイルをS3のバケットにアップロード
class UploadS3Task(luigi.Task):
    task_namespace = 'iris_tasks'

    s3_model_path = luigi.Parameter()

    def requires(self):
        return CreateModelTask()

    def run(self):
        with self.output().open("w") as target:
            # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー
            shutil.copy(self.input().path, target.tmp_path)
        
    def output(self):
        # S3Targetでアップロードできる(botoのインストールが必要)
        return S3Target(self.s3_model_path)


if __name__ == '__main__':
    luigi.run(['iris_tasks.UploadS3Task', '--workers', '1', '--local-scheduler'])

最初のDownloadCsvTaskでは、irisのデータをダウンロードしてCSVファイルでローカルに保存します。

luigi.cfgで渡されたパラメータですが、変数名 = luigi.Parameter()とすることで、変数名と一致するパラメータの値が得られます。 ここでは、[iris_tasks.DownloadCsvTask]からsource_urlintermediate_original_pathの値を取り出しています。

class DownloadCsvTask(luigi.Task):
    task_namespace = 'iris_tasks'

    source_url = luigi.Parameter()
    intermediate_original_path = luigi.Parameter()

    def run(self):
        with self.output().open("w") as target:
            response = requests.get(self.source_url)
            target.write(response.text)

    def output(self):
        return luigi.LocalTarget(self.intermediate_original_path)

2番目にCreateModelTaskが実行されます。前段のDownloadCsvTaskからoriginal.csvを受け取り、分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力しています。

これまでと同様LocalTargetを使っていますが、バイナリで出力する場合は、formatオプションにNorFormatを指定します。 ちなみに、runメソッドでファイルをオープンするときは、open(self.output().path, "wb")としてます(バージョンの問題なのか self.output().open("wb")ではだめでした)。

class CreateModelTask(luigi.Task):
    task_namespace = 'iris_tasks'

    intermediate_model_path = luigi.Parameter()

    def requires(self):
        return DownloadCsvTask()

    def run(self):
        with open(self.output().path, "wb") as target:
            # self.output().open("wb") ではエラーが発生した
            # https://github.com/spotify/luigi/issues/1647

            data = pandas.read_csv(self.input().path, header=None)
            x = data.iloc[:, [0, 1, 2, 3]]
            y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2})

            model = svm.SVC()
            model.fit(x, y)

            pickle.dump(model, target)

    def output(self):
        # バイナリファイルを出力する場合は、formatにNopFormatを指定
        return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat)

最後のUploadS3Taskでは、作成したpklファイルをS3のパスにアップロードします。

S3に出力する場合は、S3Targetを使うと自然に出力できます。
botoも事前にインストールしておく必要があります(内部的にはS3Clientでアクセスしています)。また、luigi.cfgの[s3]で渡したaws_access_key_idaws_secret_access_keyの値が認証情報として使われます。

class UploadS3Task(luigi.Task):
    task_namespace = 'iris_tasks'

    s3_model_path = luigi.Parameter()

    def requires(self):
        return CreateModelTask()

    def run(self):
        with self.output().open("w") as target:
            # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー
            shutil.copy(self.input().path, target.tmp_path)
        
    def output(self):
        # S3Targetでアップロードできる(botoのインストールが必要)
        return S3Target(self.s3_model_path)

まとめ

ジョブスケジューラを使うまででもないけど、データの受け渡しを伴う処理だったり、時間がかかる処理を分けたい場合に使えそうですね。