Python: Luigiでデータパイプラインを作る

バッチ処理の実装にLuigiを使う機会があり、少し調べて整理しました。
irisデータセットをダウンロードしてきて、scikit-learnで学習したモデルをS3にアップロードする、簡単なサンプルも作ります。

Luigi

Luigiは、データパイプラインを記述するためのPythonフレームワークです。

github.com

特徴

  • タスク間の依存関係を定型的に定義できる
  • 失敗した(アウトプットが生成できなかった)タスクから再開できる
  • スケジューリングはできない
    • スタートキックは別の誰かが行う必要がある

ユースケース

処理間に依存関係があり、各処理が複雑な場合に役立ちます。

  • 例えば、データの前処理やパラメータ最適化などを含む複雑な処理フローを構造化して実装したい
  • 例えば、インプットとなるデータ量が膨大なので、適切に中間生成物を作りながら処理したい

一方で、ジョブのスケジューリングは別の機構(cronやAWS Data PipelineやAirFlowなど)が必要となります。

そうしたジョブスケジューラから呼び出され、依存関係があるまとまった処理を行うバッチ、という位置づけが良さそうです。

基本的な使い方

はじめにluigiをインストールします。

pip install luigi

最初に簡単な2つのタスクを定義して、動きを見ていきます。

  1. Task1: intermediate/task1.txtにファイルを出力
  2. Task2: intermediate/task2.txt(Task1の出力)を読み込み、output/task2.txtにファイル出力

フォルダ構造は以下のとおりです。main.pyが今回実装するプログラムです。python main.pyで実行できます。

.
├── Intermediate
├── main.py
└── output

main.pyの実装を示します。

import luigi
import time


class Task1(luigi.Task):
    task_namespace = 'tasks'

    def run(self):
        print("Task1: run")
        with self.output().open("w") as target:
            target.write(f"This file was generated by Task1 at {time.asctime()}.")

    def output(self):
        print("Task1: output")
        return luigi.LocalTarget("intermediate/task1.txt")


class Task2(luigi.Task):
    task_namespace = 'tasks'

    def requires(self):
        print("Task2: requires")
        return Task1()

    def run(self):
        print("Task2: run")
        with self.input().open("r") as intermediate, self.output().open("w") as target:
            task1_text = intermediate.read()

            target.write(f"{task1_text}\n")
            target.write(f"This file was generated by Task2 at {time.asctime()}.")

    def output(self):
        print("Task2: output")
        return luigi.LocalTarget("output/task2.txt")


if __name__ == '__main__':
    luigi.run(['tasks.Task2', '--workers', '1', '--local-scheduler'])

Luigiでは、入力・処理・出力の一連の処理をTaskクラス(あるいはその継承クラス)を継承することで定義します。
Taskクラスでは、requires、output、runの3つのメソッドを主に実装します。

  • requiresメソッドでは、前段のタスクを記述し、依存関係を定義する
  • outputメソッドでは、出力先を定義する
  • runメソッドでは、出力を生成する処理を記述する
    • outputメソッドがFalse(存在しない)の場合のみ実行される

Task1では、依存するタスクが無いため、outputとrunのみを定義しています。

  • outputは、luigi.LocalTarget("intermediate/task1.txt")でintermediate/task1.txtファイルを出力先に指定
    • LocalTarget以外にも、luigi.contrib.s3.S3Targetやluigi.contrib.bigquery.BigqueryTargetなどが提供されています (参考)
  • runは、出力先をouputメソッドで取得し、文字列(This file was generated by Task1 at Sat Mar 31 13:47:18 2018.)を書き込んでいます

Task2では、requires、output、putを定義してます。

  • requiresでは、依存するTask1を生成して返すことで、先にTask1を実行させるようにします
  • runでは、Task1の出力をinputメソッドで取得しています

上を実行すると、task1.txtとtask2.txtが生成されます。

.
├── Intermediate
│   └── task1.txt
├── input
├── main.py
└── output
    └── task2.txt

コンソールの表示からは以下の順番で実行されていることがわかります。また、Luigi Execution Summaryからは2つのタスクが正常に完了したことも確認できます。

DEBUG: Checking if tasks.Task2() is complete
Task2: output
Task2: requires
DEBUG: Checking if tasks.Task1() is complete
Task1: output
Task1: run
Task1: output
Task2: requires
Task1: output
Task2: run
Task2: requires
Task1: output
Task2: output
...
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 2 ran successfully:
    - 1 tasks.Task1()
    - 1 tasks.Task2()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

ここで、Task2に問題がありtask2.txtのみが出力されなかった場合を想定して、task2.txtのみを削除します。main.pyを再実行すると、Task2のみが実行されます。

===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 tasks.Task1()
* 1 ran successfully:
    - 1 tasks.Task2()

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

一方で、task1.txtのみを削除して再実行しても、task2.txtは存在するため何も行われません(最初から再作成されたりしません)。

===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 present dependencies were encountered:
    - 1 tasks.Task2()

Did not run any tasks
This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====

機械学習やS3と組み合わせたサンプル

応用例として、Irisデータセットを使って機械学習した結果をS3にアップロードする3つのタスクを定義します。

  1. iris_tasks.DownloadCsvTask: Irisデータセット(CSVファイル)をダウンロードして、intermediate/original.csvに出力する
  2. iris_tasks.CreateModelTask: original.csvを読み込み、SVMで機械学習してモデルを作り、intermediate/model.pklに出力する
  3. iris_tasks.UploadS3Task: model.pklを読み込み、S3にアップロードする

設定ファイルはフォルダ構成は以下のとおりです。

.
├── intermediate
├── luigi.cfg
├── main.py
└── output

新たにluigi.cfgという設定ファイルを追加しています。
Luigiでは、実行時のカレントパスに"luigi.cfg"を配置するか、または、"LUIGI_CONFIG_PATH"でファイルパスを指定することで、実行時に参照できるパラメータを設定できます。

luigi.cfgは以下のように記述しています。
タスクの名前(例えば[iris_tasks.DownloadCsvTask])を指定し、パラメータ(iris_tasks.DownloadCsvTaskの場合は、source_urlintermediate_original_path)を渡します。
[s3]はS3のアップロードで必要な認証情報を渡しています。このあと触れるS3Targetでは、[s3]で渡された認証情報を使ってS3にアクセスします。

[iris_tasks.DownloadCsvTask]
source_url=https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
intermediate_original_path=intermediate/original.csv

[iris_tasks.CreateModelTask]
intermediate_model_path=intermediate/model.pkl

[iris_tasks.UploadS3Task]
s3_model_path=s3://test-s3-path/model.pkl

[s3]
aws_access_key_id=XXXXXXXXXXXXXXXXXXXX
aws_secret_access_key=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

それでは実装を示します。こちらもpython main.pyで実行できます。

import luigi
from luigi.contrib.s3 import S3Target # pip install boto
import requests
import pandas
import pickle
import shutil
from sklearn import svm


# irisデータをダウンロードしてCSVファイルに出力
class DownloadCsvTask(luigi.Task):
    task_namespace = 'iris_tasks'

    source_url = luigi.Parameter()
    intermediate_original_path = luigi.Parameter()

    def run(self):
        with self.output().open("w") as target:
            response = requests.get(self.source_url)
            target.write(response.text)

    def output(self):
        return luigi.LocalTarget(self.intermediate_original_path)


# irisの分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力
class CreateModelTask(luigi.Task):
    task_namespace = 'iris_tasks'

    intermediate_model_path = luigi.Parameter()

    def requires(self):
        return DownloadCsvTask()

    def run(self):
        with open(self.output().path, "wb") as target:
            # self.output().open("wb") ではエラーが発生した
            # https://github.com/spotify/luigi/issues/1647

            data = pandas.read_csv(self.input().path, header=None)
            x = data.iloc[:, [0, 1, 2, 3]]
            y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2})

            model = svm.SVC()
            model.fit(x, y)

            pickle.dump(model, target)

    def output(self):
        # バイナリファイルを出力する場合は、formatにNopFormatを指定
        return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat)


# pklファイルをS3のバケットにアップロード
class UploadS3Task(luigi.Task):
    task_namespace = 'iris_tasks'

    s3_model_path = luigi.Parameter()

    def requires(self):
        return CreateModelTask()

    def run(self):
        with self.output().open("w") as target:
            # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー
            shutil.copy(self.input().path, target.tmp_path)
        
    def output(self):
        # S3Targetでアップロードできる(botoのインストールが必要)
        return S3Target(self.s3_model_path)


if __name__ == '__main__':
    luigi.run(['iris_tasks.UploadS3Task', '--workers', '1', '--local-scheduler'])

最初のDownloadCsvTaskでは、irisのデータをダウンロードしてCSVファイルでローカルに保存します。

luigi.cfgで渡されたパラメータですが、変数名 = luigi.Parameter()とすることで、変数名と一致するパラメータの値が得られます。 ここでは、[iris_tasks.DownloadCsvTask]からsource_urlintermediate_original_pathの値を取り出しています。

class DownloadCsvTask(luigi.Task):
    task_namespace = 'iris_tasks'

    source_url = luigi.Parameter()
    intermediate_original_path = luigi.Parameter()

    def run(self):
        with self.output().open("w") as target:
            response = requests.get(self.source_url)
            target.write(response.text)

    def output(self):
        return luigi.LocalTarget(self.intermediate_original_path)

2番目にCreateModelTaskが実行されます。前段のDownloadCsvTaskからoriginal.csvを受け取り、分類モデル(SVM)を作り、pickleでシリアライズしてファイル出力しています。

これまでと同様LocalTargetを使っていますが、バイナリで出力する場合は、formatオプションにNorFormatを指定します。 ちなみに、runメソッドでファイルをオープンするときは、open(self.output().path, "wb")としてます(バージョンの問題なのか self.output().open("wb")ではだめでした)。

class CreateModelTask(luigi.Task):
    task_namespace = 'iris_tasks'

    intermediate_model_path = luigi.Parameter()

    def requires(self):
        return DownloadCsvTask()

    def run(self):
        with open(self.output().path, "wb") as target:
            # self.output().open("wb") ではエラーが発生した
            # https://github.com/spotify/luigi/issues/1647

            data = pandas.read_csv(self.input().path, header=None)
            x = data.iloc[:, [0, 1, 2, 3]]
            y = data.iloc[:, 4].replace({'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2})

            model = svm.SVC()
            model.fit(x, y)

            pickle.dump(model, target)

    def output(self):
        # バイナリファイルを出力する場合は、formatにNopFormatを指定
        return luigi.LocalTarget(self.intermediate_model_path, format=luigi.format.NopFormat)

最後のUploadS3Taskでは、作成したpklファイルをS3のパスにアップロードします。

S3に出力する場合は、S3Targetを使うと自然に出力できます。
botoも事前にインストールしておく必要があります(内部的にはS3Clientでアクセスしています)。また、luigi.cfgの[s3]で渡したaws_access_key_idaws_secret_access_keyの値が認証情報として使われます。

class UploadS3Task(luigi.Task):
    task_namespace = 'iris_tasks'

    s3_model_path = luigi.Parameter()

    def requires(self):
        return CreateModelTask()

    def run(self):
        with self.output().open("w") as target:
            # 存在するファイルをアップロードする場合は、tmp_pathにファイルをコピー
            shutil.copy(self.input().path, target.tmp_path)
        
    def output(self):
        # S3Targetでアップロードできる(botoのインストールが必要)
        return S3Target(self.s3_model_path)

まとめ

ジョブスケジューラを使うまででもないけど、データの受け渡しを伴う処理だったり、時間がかかる処理を分けたい場合に使えそうですね。

C# クエリストリング(?var=hoge&...)を作る

C#でクエリストリングを作る方法の備忘録です。

クエリストリングは、URLのパスの後ろに?変数名1=値1&変数名2=値2&...といった形で任意の値が渡される文字列です。例えば以下のような文字列です。

http://ohke.hateblo.jp/search?q=Python&page=1504879200

LINQで文字列を連結することもできますが、' '(スペース)や'&'(アンパサンド)などの特殊記号をエスケープする必要があったりするため、自前での実装は避けたほうがいいです。

かわりにSystem.Web.HttpUtility.ParseQueryString.aspx)メソッドを使うと、安全かつ容易にクエリストリングを生成できます。

  • ' 'や'&'などはパーセントエンコーディングされています
var query = System.Web.HttpUtility.ParseQueryString("");

query.Add("name", "Tanaka, Ichiro");
query.Add("childs", "jiro&saburo");

var queryString = query.ToString();

Console.WriteLine(queryString);
// name=Tanaka%2c+Ichiro&childs=jiro%26saburo

生成されたクエリストリングをSystem.UriBuilder.aspx)クラスに渡せば、URIも一発生成できます。

var uriBuilder = new System.UriBuilder("example.host.name") {
    Query = query.ToString()
};

Console.WriteLine(uriBuilder.Uri);
// http://example.host.name/?name=Tanaka%2c+Ichiro&childs=jiro%26saburo

Python: 回帰モデルで市場反応分析

これまで緑本などで学んできた統計モデルを、マーケティングに応用するための勉強を行っています。

今回は市場反応分析を線形回帰モデルとポアソン回帰モデルで行います。
市場反応分析に関する理論や使用するデータは、マーケティングの統計モデル (統計解析スタンダード)の第3章「消費者の市場反応のモデル化」を参考にさせていただいてます。

マーケティングの統計モデル (統計解析スタンダード)

実装はこちらです。

statistical_modeling_for_marketing/chapter3.ipynb at master · ohke/statistical_modeling_for_marketing · GitHub

市場反応分析

マーケティング施策は、顧客の行動(購入や予約など)の変化を促すために行われます。

  • 例えば、ある商品を値下げすると、その商品の販売個数が増える
  • 例えば、ある商品を山積み陳列すると、その商品の販売個数は増えるが、類似商品の販売個数は減る ...など

施策を効率的に実施するためには、上のように施策の効果を定性的に捉えるだけではなく、定量化する必要があります。

  • ある商品をX%値下げすると、その商品の販売個数がY%増える
  • ある商品を山積み陳列すると、その商品の販売個数はX%増えるが、類似商品の販売個数はY%減る

このように、施策を受けた顧客の行動変化を定量的にモデル化することを市場反応分析といいます。

価格と陳列によるPI値の変化を分析

それでは、擬似的なデータを使って、具体的に市場反応分析をしていきます。データはこちらからダウンロードできます。

最初の事例として、2つのブランドの醤油(醤油Aと醤油Bとする)を販売するケースを考えます。

両方の商品について「価格を変更する」「山積み陳列する」というそれぞれ2つの施策が組み合わせて行われています。
上の施策を実施した結果、醤油AのPI値の対数がどのように変化するのかをモデル化します。つまり醤油AのPI値の対数が目的変数となります。

データを概観する

まずはデータを概観します。

  • LogPI_Aが醤油AのPI値の対数
    • PI値とは、その商品の販売点数÷客数で計算される値で、その商品の人気度・支持度を表す指標で、
    • LogPI_Aは、  \log \frac{\mbox{販売点数}\times \mbox{客数}}{1000} で計算されています
  • LogPriceIndex_AとLogPriceIndex_Bはそれぞれ、  \log \frac{\mbox{商品Aの売価}}{\mbox{商品Aの期間最大売価}}  \log \frac{\mbox{商品Bの売価}}{\mbox{商品Bの期間最大売価}} で計算される値で、ここでは価格掛率と呼びます
    • 値が小さいほど安売りされていることを表す
  • Display_AとDisplay_Bは、それぞれの商品が山積みされていれば1となるダミー変数

f:id:ohke:20180324132509p:plain

醤油Aのと価格掛率の相関を見てみます。

  • 醤油Aが安売りされている(価格掛率LogPriceIndex_Aが小さい)と、PI値が大きくなる
  • 醤油Bが安売りされている(価格掛率LogPriceIndex_Bが小さい)と、PI値が大きくなる
    • 醤油Aと醤油Bが競合関係にあることがわかります

f:id:ohke:20180324134021p:plain

山積み陳列の有無による醤油AのPI値の分布を箱ひげ図で見てみます。

  • 醤油Aが山積み陳列される(Display_A=1)と、PI値が大きくなる
  • 醤油Bが山積み陳列される(Display_B=1)と、PI値が小さくなる

f:id:ohke:20180324134518p:plain

2標本t検定で醤油Bの山積み陳列の効果を確認

上の箱ひげ図では、醤油Aの山積み陳列によるPI値の増加ははっきり現れていますが、醤油Bの山積み陳列による影響はそれより弱く、誤差という見方ができるかもしれません。

モデル化の前に、醤油Bの山積み陳列によるPI値の変化が誤差ではない(起きにくい)ことを、念のため検定で確認しておきます。ここでは、価格掛率は一旦無視します。なおこの判断は誤りであることが後にわかります。線形回帰モデルを作る時にそれも見ていきます。

なお、検定では統計学入門 (基礎統計学?)の第11・12章を参考にしています。

平均値の差が有意かどうかをt検定で確認します。2つのグループで分散が異なっている(Display_B=1では0.765、Display_B=0では0.910)ため、ウェルチのt検定を行います。

まずはt統計量を計算します。  s が不偏分散、  n がデータ数です。下付き添字が1ならDisplay_B=1、0ならDisplay_B=0を表します。

$$ t=\frac{\overline{PI_1} - \overline{PI_0}}{\sqrt{\frac{s_1^2}{n_1}+\frac{s_0^2}{n_0}}} $$

t分布の自由度  \nu は以下の式で近似できます。 \nu から最も近い整数  \nu^{*} を選び、自由度  \nu^{*} のt分布の5%の下側信頼限界の  t_{0.05}(\nu^{*}) 値を算出します。上で求めた値がこの値よりも小さければ(つまり  t \mbox{<} t_{0.05}(\nu^{*}) )、有意差があると判定します。これは左片側検定です。

$$ \nu=\frac{(\frac{s_1^2}{n_1}+\frac{s_0^2}{n_0})^2}{\frac{(s_1^2 / n_1)^2}{n_1-1}+\frac{(s_0^2 / n_0)^2}{n_0-1}} $$

Pythonではこんな感じで計算できます。

m = len(treatment_A)
n = len(control_A)
s_1 = treatment_A.var()
s_2 = control_A.var()

t = (treatment_A.mean() - control_A.mean()) / math.sqrt(s_1**2 / m + s_2**2 / n)
print('t:', t)

# 自由度νを近似的に計算
nu = (s_1 ** 2 / m + s_2**2 / n)**2 / ((s_1**2 / m)**2 / (m-1) + (s_2**2 / n)**2 / (n-1))
nu_star = round(nu) # 整数ν*に丸める

print('-t_0.95(ν*):', -stats.t.ppf(0.95, nu_star))

 t\mbox{<}-t_{0.05}(\nu^{*}) \iff -8.47\mbox{<}-1.65 となるため、グループ間の平均値に有意差があることも確認できました。

線形回帰でモデル化

PI値は連続変数なので、正規分布を仮定した線形回帰でモデル化します。

$$ LogPIA_t=\beta_0+LogPriceIndexA_t\beta_1+LogPriceIndexB_t\beta_2+DisplayA_t\beta_3+DisplayB_t\beta_4+\epsilon_t $$

import statsmodels.api as sm

model = sm.OLS(data['LogPI_A'],
               sm.add_constant(data[['LogPriceIndex_A', 'LogPriceIndex_B', 'Display_A', 'Display_B']]))
result = model.fit()

結果は以下のとおりです。いずれも各施策の効果が係数となっています。

  • LogPriceIndex_Aの係数が負数(-5.02)、Display_Aの係数が正(0.774)となっており、醤油Aの販売にプラスに働いていることが数値化されています
  • LogPriceIndex_Bの傾きは正数(1.93)で、競合商品の値引きの影響を受けることが確認できます

f:id:ohke:20180324152547p:plain

一方でDisplay_Bの傾きがわずかですが正数(0.0374)となっています。95%信頼区間では0を跨いでいるので負になる可能性もありますが、検定とは反した結果となっています。
LogPriceIndex_BとDisplay_Bの相関が高い(-0.641)にも関わらず、LogPriceIndex_Bを無視して検定したため、Display_Bの影響を過大評価してしまいました。安売りする商品を山積み陳列するというのは、相関係数的にも直感的にもよくあるパターンに見えます。

  • 他の条件が異なるグループ間での比較にt検定を使うべきではありません
    • LogPriceIndex_Bも揃えたグループ間で平均値を比較すべきです
  • こうした複数の変数がからむ場合、初めから回帰モデルに落とした方が良いかと思います

モデルにもLogPriceIndex_B×Display_Bの交互作用項を追加する必要がありそうです。
LogPriceIndex_B×Display_Bを交互作用項(LogPriceIndex_Display_B)としてパラメータに追加して線形回帰した結果が以下です。Display_Bの係数は-0.330(95%信頼区間は[-0.550, -0.110])で、Display_B単体では醤油Aの販売に負に影響していることがわかります。

f:id:ohke:20180326083747p:plain

価格と陳列による販売個数の変化を分析

同じく醤油Aと醤油Bを販売する事例について考えますが、今回は販売個数(つまりカウントデータ)のモデル化を行います。

データを概観する

同様にデータを概観します。

  • Sale_Unit_Aは醤油Aの販売個数
  • LogPriceIndex_AとLogPriceIndex_Bは、先の事例と同じ価格掛率ですが、今回は対数化されていません(0以上1以下の値域をとる)
  • Display_AとDisplay_Bも同じく山積み陳列の有無
  • Visitorsは来店者数

f:id:ohke:20180324155858p:plain

販売個数と価格掛率・来店者数の分布をプロットします。

  • 醤油Aが安くなれば販売個数は増え、醤油Bが安くなれば販売個数が減る傾向が見えます
  • 来店者数と販売個数も緩い正相関がありそう

f:id:ohke:20180324160455p:plain

山積み陳列の有無毎に、販売個数を箱ひげ図でプロットします。ここでもやはり、競合商品(醤油B)を山積み陳列した時は醤油Aの販売個数が減る傾向が伺えます。

f:id:ohke:20180324160506p:plain

ポアソン回帰でモデル化

販売個数はカウントデータなので、ポアソン回帰(+対数リンク関数)でモデル化します。ポアソン分布の平均値  \mu は以下で計算します。

  • Visitorsはオフセット(傾きを持たない定数項)として扱っています

$$ \log(\mu_t)=\log(Visitors_t)+\beta_0+PriceIndexA_t\beta_1+PriceIndexB_t\beta_2+DisplayA_t\beta_3+DisplayB_t\beta_4 $$

Pythonでの実装は以下となります。

  • offsetオプションで、オフセット値を指定できます
import statsmodels.formula.api as smf

model = smf.glm('Sale_Unit_A ~ PriceIndex_A + PriceIndex_B + Display_A + Display_B',
                data=data,
                offset=np.log(data['Visitors']),
                family=sm.families.Poisson(link=sm.families.links.log))
result = model.fit()

結果は以下です。今回はDisplay_Bの傾きが負数となっており、わずかながらですが競合商品の山積み陳列が悪影響を与えることがわかります。

f:id:ohke:20180324161921p:plain