最近はお仕事でKubeflow Pipelinesを触り始めています。
PythonでDAGを定義し、SDK (KFP) を使ってArgo Workflowのマニフェストを出力して、それをKubeflowにアップロードしてパイプラインを作る、という流れで開発しています。
今回は、タスクの一部を並列化する方法について備忘録としてまとめます。PythonとSDK (KFP) は以下のバージョンを用いています。
$ python --version Python 3.8.5 $ pip list | grep kfp kfp 0.5.1
サンプルパイプライン
サンプルとして、1〜nの数字をm個ずつに分割してそれぞれで合計を求めるパイプラインを考えます。ユーザから与えられるパラメータはnとmです。
上を実現するために preprocessで分割 -> processで合計 という簡単な2段階のパイプラインを実装したものが、以下コードです (ここではmain.py) 。次に$ python main.py
で実行することでマニフェストを生成してKubeflowにアップロードします。
import kfp from kfp import compiler, dsl, components def _preprocess(n: int, m: int) -> list: num_lists = [] for num in range(0, n, m): num_lists.append([i for i in range(num + 1, num + m + 1) if i <= n]) return num_lists def _process(num_lists: list) -> list: return [sum(nums) for nums in num_lists] @kfp.dsl.pipeline(name="serial-pipeline") def pipeline(n: int, m: int) -> None: image = "python:3.8-alpine" preprocess_func = components.func_to_container_op(_preprocess, base_image=image) preprocess_op = preprocess_func(n, m) process_func = components.func_to_container_op(_process, base_image=image) process_op = process_func(preprocess_op.output) process_op.after(preprocess_op) if __name__ == "__main__": compiler.Compiler().compile(pipeline, "pipeline.yaml")
マニフェストをアップロードすると、以下のパイプラインが定義されます。このパイプラインを n=100, m=30 で実行すると [465, 1365, 2265, 955]
がprocessから出力されます。
processの並列化
2段目のprocessを並列化していきます。
main.pyの実装を以下の用に変更します。キモはParallelForで、これを使うとprprocessの出力がprocessの入力にwithParamで渡されるマニフェストへ変換されます (https://argoproj.github.io/argo/examples/#loopsも参照) 。これによって、今回のように入力パラメータによって並列数を変えることができます。
- _preprocessではJSON文字列にする
- withParamで解釈できるフォーマットにしないといけないため
- コンテキスト (with) の範囲ならprocessからさらに他のオペレータを連結させることもできます
import kfp from kfp import compiler, dsl, components def _preprocess(n: int, m: int) -> str: import json num_lists = [] for num in range(0, n, m): num_lists.append([str(i) for i in range(num + 1, num + m + 1) if i <= n]) return json.dumps([",".join(nums) for nums in num_lists]) def _process(nums: str) -> int: return sum([int(num) for num in nums.split(",")]) @kfp.dsl.pipeline(name="parallel-pipeline") def pipeline(n: int, m: int) -> None: image = "python:3.8-alpine" preprocess_func = components.func_to_container_op(_preprocess, base_image=image) preprocess_op = preprocess_func(n, m) with dsl.ParallelFor(preprocess_op.output) as nums: process_func = components.func_to_container_op(_process, base_image=image) process_op = process_func(nums) process_op.after(preprocess_op) return process_op.output if __name__ == "__main__": compiler.Compiler().compile(pipeline, "pipeline.yaml")
Kubeflowにアップロードすると、loopが入って3段のように見えます (左図) 。同じく n=100, m=30 で実行すると、processが4並列で実行されることが確認できます (右図) 。
並列後の集約
ここまでできたらあとは結果を集約 (fan-in) するタスクを組みたくなりますが、残念ながら2020/8時点のKFP 0.5.1では簡単にはできないようです。対応を待ちましょう。
- 参考: Very difficult to collect results of a ParallelFor over a dynamic number of items · Issue #3412 · kubeflow/pipelines · GitHub
- Argo workflowではfan-inできるようです (StackOverflow) ので、生成されたマニフェストファイルを直接編集すればできるのかな...? (未確認)