Kubeflow Pipelines SDKを用いた並列処理の実装

最近はお仕事でKubeflow Pipelinesを触り始めています。
PythonでDAGを定義し、SDK (KFP) を使ってArgo Workflowのマニフェストを出力して、それをKubeflowにアップロードしてパイプラインを作る、という流れで開発しています。

今回は、タスクの一部を並列化する方法について備忘録としてまとめます。PythonとSDK (KFP) は以下のバージョンを用いています。

$ python --version
Python 3.8.5

$ pip list | grep kfp
kfp                      0.5.1

サンプルパイプライン

サンプルとして、1〜nの数字をm個ずつに分割してそれぞれで合計を求めるパイプラインを考えます。ユーザから与えられるパラメータはnとmです。

上を実現するために preprocessで分割 -> processで合計 という簡単な2段階のパイプラインを実装したものが、以下コードです (ここではmain.py) 。次に$ python main.pyで実行することでマニフェストを生成してKubeflowにアップロードします。

import kfp
from kfp import compiler, dsl, components

def _preprocess(n: int, m: int) -> list:
    num_lists = []
    for num in range(0, n, m):
        num_lists.append([i for i in range(num + 1, num + m + 1) if i <= n])
    return num_lists

def _process(num_lists: list) -> list:
    return [sum(nums) for nums in num_lists]

@kfp.dsl.pipeline(name="serial-pipeline")
def pipeline(n: int, m: int) -> None:
    image = "python:3.8-alpine"

    preprocess_func = components.func_to_container_op(_preprocess, base_image=image)
    preprocess_op = preprocess_func(n, m)

    process_func = components.func_to_container_op(_process, base_image=image)
    process_op = process_func(preprocess_op.output)

    process_op.after(preprocess_op)

if __name__ == "__main__":
    compiler.Compiler().compile(pipeline, "pipeline.yaml")

マニフェストをアップロードすると、以下のパイプラインが定義されます。このパイプラインを n=100, m=30 で実行すると [465, 1365, 2265, 955] がprocessから出力されます。

processの並列化

2段目のprocessを並列化していきます。

main.pyの実装を以下の用に変更します。キモはParallelForで、これを使うとprprocessの出力がprocessの入力にwithParamで渡されるマニフェストへ変換されます (https://argoproj.github.io/argo/examples/#loopsも参照) 。これによって、今回のように入力パラメータによって並列数を変えることができます。

  • _preprocessではJSON文字列にする
    • withParamで解釈できるフォーマットにしないといけないため
  • コンテキスト (with) の範囲ならprocessからさらに他のオペレータを連結させることもできます
import kfp
from kfp import compiler, dsl, components

def _preprocess(n: int, m: int) -> str:
    import json

    num_lists = []
    for num in range(0, n, m):
        num_lists.append([str(i) for i in range(num + 1, num + m + 1) if i <= n])
    return json.dumps([",".join(nums) for nums in num_lists])

def _process(nums: str) -> int:
    return sum([int(num) for num in nums.split(",")])

@kfp.dsl.pipeline(name="parallel-pipeline")
def pipeline(n: int, m: int) -> None:
    image = "python:3.8-alpine"

    preprocess_func = components.func_to_container_op(_preprocess, base_image=image)
    preprocess_op = preprocess_func(n, m)

    with dsl.ParallelFor(preprocess_op.output) as nums:
        process_func = components.func_to_container_op(_process, base_image=image)
        process_op = process_func(nums)
        process_op.after(preprocess_op)

    return process_op.output

if __name__ == "__main__":
    compiler.Compiler().compile(pipeline, "pipeline.yaml")

Kubeflowにアップロードすると、loopが入って3段のように見えます (左図) 。同じく n=100, m=30 で実行すると、processが4並列で実行されることが確認できます (右図) 。

並列後の集約

ここまでできたらあとは結果を集約 (fan-in) するタスクを組みたくなりますが、残念ながら2020/8時点のKFP 0.5.1では簡単にはできないようです。対応を待ちましょう。