Kubeflow Pipelines SDKを用いた並列処理の実装

最近はお仕事でKubeflow Pipelinesを触り始めています。
PythonでDAGを定義し、SDK (KFP) を使ってArgo Workflowのマニフェストを出力して、それをKubeflowにアップロードしてパイプラインを作る、という流れで開発しています。

今回は、タスクの一部を並列化する方法について備忘録としてまとめます。PythonとSDK (KFP) は以下のバージョンを用いています。

$ python --version
Python 3.8.5

$ pip list | grep kfp
kfp                      0.5.1

サンプルパイプライン

サンプルとして、1〜nの数字をm個ずつに分割してそれぞれで合計を求めるパイプラインを考えます。ユーザから与えられるパラメータはnとmです。

上を実現するために preprocessで分割 -> processで合計 という簡単な2段階のパイプラインを実装したものが、以下コードです (ここではmain.py) 。次に$ python main.pyで実行することでマニフェストを生成してKubeflowにアップロードします。

import kfp
from kfp import compiler, dsl, components

def _preprocess(n: int, m: int) -> list:
    num_lists = []
    for num in range(0, n, m):
        num_lists.append([i for i in range(num + 1, num + m + 1) if i <= n])
    return num_lists

def _process(num_lists: list) -> list:
    return [sum(nums) for nums in num_lists]

@kfp.dsl.pipeline(name="serial-pipeline")
def pipeline(n: int, m: int) -> None:
    image = "python:3.8-alpine"

    preprocess_func = components.func_to_container_op(_preprocess, base_image=image)
    preprocess_op = preprocess_func(n, m)

    process_func = components.func_to_container_op(_process, base_image=image)
    process_op = process_func(preprocess_op.output)

    process_op.after(preprocess_op)

if __name__ == "__main__":
    compiler.Compiler().compile(pipeline, "pipeline.yaml")

マニフェストをアップロードすると、以下のパイプラインが定義されます。このパイプラインを n=100, m=30 で実行すると [465, 1365, 2265, 955] がprocessから出力されます。

processの並列化

2段目のprocessを並列化していきます。

main.pyの実装を以下の用に変更します。キモはParallelForで、これを使うとprprocessの出力がprocessの入力にwithParamで渡されるマニフェストへ変換されます (https://argoproj.github.io/argo/examples/#loopsも参照) 。これによって、今回のように入力パラメータによって並列数を変えることができます。

  • _preprocessではJSON文字列にする
    • withParamで解釈できるフォーマットにしないといけないため
  • コンテキスト (with) の範囲ならprocessからさらに他のオペレータを連結させることもできます
import kfp
from kfp import compiler, dsl, components

def _preprocess(n: int, m: int) -> str:
    import json

    num_lists = []
    for num in range(0, n, m):
        num_lists.append([str(i) for i in range(num + 1, num + m + 1) if i <= n])
    return json.dumps([",".join(nums) for nums in num_lists])

def _process(nums: str) -> int:
    return sum([int(num) for num in nums.split(",")])

@kfp.dsl.pipeline(name="parallel-pipeline")
def pipeline(n: int, m: int) -> None:
    image = "python:3.8-alpine"

    preprocess_func = components.func_to_container_op(_preprocess, base_image=image)
    preprocess_op = preprocess_func(n, m)

    with dsl.ParallelFor(preprocess_op.output) as nums:
        process_func = components.func_to_container_op(_process, base_image=image)
        process_op = process_func(nums)
        process_op.after(preprocess_op)

    return process_op.output

if __name__ == "__main__":
    compiler.Compiler().compile(pipeline, "pipeline.yaml")

Kubeflowにアップロードすると、loopが入って3段のように見えます (左図) 。同じく n=100, m=30 で実行すると、processが4並列で実行されることが確認できます (右図) 。

並列後の集約

ここまでできたらあとは結果を集約 (fan-in) するタスクを組みたくなりますが、残念ながら2020/8時点のKFP 0.5.1では簡単にはできないようです。対応を待ちましょう。

論文メモ: An intriguing failing of convolutional neural networks and the CoordConv solution

畳み込みニューラルネットワークが持つ座標変換の問題に着目してCoordConvを提案したAn intriguing failing of convolutional neural networks and the CoordConv solution (NeurIPS'18, arXiv) について紹介します。

@incollection{NIPS2018_8169,
title = {An intriguing failing of convolutional neural networks and the CoordConv solution},
author = {Liu, Rosanne and Lehman, Joel and Molino, Piero and Petroski Such, Felipe and Frank, Eric and Sergeev, Alex and Yosinski, Jason},
booktitle = {Advances in Neural Information Processing Systems 31},
editor = {S. Bengio and H. Wallach and H. Larochelle and K. Grauman and N. Cesa-Bianchi and R. Garnett},
pages = {9605--9616},
year = {2018},
publisher = {Curran Associates, Inc.},
url = {http://papers.nips.cc/paper/8169-an-intriguing-failing-of-convolutional-neural-networks-and-the-coordconv-solution.pdf}
}

畳み込みニューラルネットワークの座標変換

CNNでは空間表現を、密なデカルト表現から疎なピクセルベースの表現へ (またはその逆) が難しい。畳み込み層の単純なスタックも、画像分類のようなタスクには効果があるが、物体検出や生成などのタスクには不向き。

CoordConv

下図右がここで提案しているCoordConvです。チャネル方向に座標情報 (以下ではi coordinateとj coordinateの2チャネル) を連結しているのが特徴です。

  • i coordinateは、[[0 0 ... 0] [1 1 ... 1] ... [h-1 h-1 ... h-1]] (行内が同一の値) となるランク1行列
  • j coordinateは、[[0 1 ... w-1] [0 1 ... w-1] ... [0 1 ... w-1]] (列内が同一の値) となるランク1行列
  • いずれも [-1, 1] に収まるように線形スケール

f:id:ohke:20200507080418p:plain

通常の畳み込み層の性質を維持しつつ、タスクの特性に応じて追加されていた座標情報をシンプルに表現している。

  • パラメータ数は  cc'k^2 ->  (c + d)c'k^2 でd (coordinatesチャネル数で2) 分のみの増加
  • coordinatesチャネルに対する重みが0になれば、通常の畳み込み同様の並進不変性は保たれる

座標変換が表現されていることをトイタスクで確認

座標変換がうまくできることを簡単なタスクで確認。

  • タスク1: (x, y)と対応する座標が1.0となるように予測する (下図上段)
  • タスク2: (x, y)を中心とする正方形をレンダリングする (下図中段)
  • タスク3: 赤と青の2つの図形をランダムに生成した画像で学習・生成する (下図下段)

Figure 1 抜粋

タスク1の結果をプロットしたのが下図です。座標に対応する点 (ピクセル) を予測するClassificationとその逆変換のRegressionの2つを比較していますが、いずれにおいても、通常の畳み込み (中段) はほぼ予測できてないが、CoordConvでは精度100%をマークするという結果になってます。

Figure 5 抜粋

CoordConvを実践的なタスクに適用

畳み込みをCoordConvに置き換えてうまく座標変換できるようにすることで、一般のタスクも改善できるかどうかを調べる。

画像分類

並進不変性を必要とするImageNet画像分類タスクの場合、Accuracyの改善につながらなかったが、一方で悪化もなかった。

  • 追加したcoordinatesチャネルによって並進不変性が損なわれないということが確認できる

Table S2 抜粋

物体検出

物体検出では、ピクセル空間からデカルト空間のBBoxに変換する座標変換を行っており、CoordConvがマッチしそうです。
Faster R-CNNのRPNをCoordConvで置き換えたネットワークで、簡単な物体検出タスクを行ったところ、IoUが大きく改善した。

Table S3 抜粋

生成モデル

生成モデルで起こる位置のモード崩壊は、潜在空間からピクセル空間への変換の難しさにも起因していると考え、CoordConvが一助になると予想した。
タスク3について、2種類のGANモデル (通常の畳み込みバージョンとCoordConvバージョン) で学習・生成した。通常の畳み込みでは図形の中心が実際の分布よりも中央に寄りやすい傾向が見られたが、CoordConvのモデルではそれが改善していた (下図 (b)) 。1000サンプルの平均値を見ても、CoordConvの方が実際の平均に近い結果が得られた (下図 (d)) 。

  • ただし、図形間の距離の分布はどちらかというとCoordConvの方が悪くなっている (下図 (c))

f:id:ohke:20200507101257p:plain
Figure 7 抜粋

まとめ

畳み込み層に位置情報をシンプルに埋め込んだCoordConvについて紹介しました。学習・推論にかかる追加コストも小さく、また、幅広いタスクに適用・応用できる点で、アドバンテージのある提案だと思います。

Python: Parquetフォーマットファイルを入出力する (Pandasとpyarrow)

今回はテーブルデータをParquetファイルで扱う方法について2つ紹介します。

コードは以下の環境で動作確認してます。

% python --version
Python 3.8.5

% pip list   
Package         Version
--------------- -------
numpy           1.19.1
pandas          1.1.0
pip             20.2
pyarrow         1.0.0
python-dateutil 2.8.1
pytz            2020.1
setuptools      49.2.0
six             1.15.0

Apache Parquet

Apache Parquet1はApacheプロジェクトの1つで、環境に依存しない列指向のファイルフォーマットを定義・メンテナンスしています。

github.com

Parquetは以下の特徴を持ちます。詳細は https://parquet.apache.org/documentation/latest/ を参照ください。

  • 列指向フォーマットのため、行指向と比較して、圧縮効率や列に対する集計処理などにおいてアドバンテージを持つ
  • プログラミング言語、CPUアーキテクチャ等に非依存のため、利用できるプラットフォームが豊富
    • Google BigQueryやAmazon AthenaなどもデータソースフォーマットとしてParquetを選択可能
  • ネストしたカラムもエンコード可能

サポートされるデータ型

Parquetで利用できるデータ型だけ確認しておきます。文字列はBYTE_ARRAYに変換する必要があります。

  • BOOLEAN: 1 bit boolean
  • INT32: 32 bit signed ints
  • INT64: 64 bit signed ints
  • INT96: 96 bit signed ints
  • FLOAT: IEEE 32-bit floating point values
  • DOUBLE: IEEE 64-bit floating point values
  • BYTE_ARRAY: arbitrarily long byte arrays.

Pandas DataFrameを用いたParquetファイルの変換

Pandas DataFrameではParquetのファイルを入出力するためのメソッドとして、to_parquetとread_parquetが実装されています。

DataFrameをParqueに保存・ロードする簡単な例を示します。文字列や日付型、NaNを含むデータも難なく変換できてます。

import pandas as pd
from datetime import datetime


dt = datetime.now()
df = pd.DataFrame({
    "id": [1, 2, 3],
    "name": ["Tanaka", "Suzuki", "Sato"],
    "rating": [3.5, None, 4.2],
    "created_at": [dt, dt, dt],
})
print(df.info())
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 3 entries, 0 to 2
# Data columns (total 4 columns):
#  #   Column      Non-Null Count  Dtype
# ---  ------      --------------  -----
#  0   id          3 non-null      int64
#  1   name        3 non-null      object
#  2   rating      2 non-null      float64
#  3   created_at  3 non-null      datetime64[ns]
# dtypes: datetime64[ns](1), float64(1), int64(1), object(1)
# memory usage: 224.0+ bytes

# Parquetで保存
df.to_parquet("./df.parquet")

# Parquetからロード
loaded_df = pd.read_parquet("./df.parquet")

print(loaded_df.info())
# <class 'pandas.core.frame.DataFrame'>
# RangeIndex: 3 entries, 0 to 2
# Data columns (total 4 columns):
#  #   Column      Non-Null Count  Dtype
# ---  ------      --------------  -----
#  0   id          3 non-null      int64
#  1   name        3 non-null      object
#  2   rating      2 non-null      float64
#  3   created_at  3 non-null      datetime64[ns]
# dtypes: datetime64[ns](1), float64(1), int64(1), object(1)
# memory usage: 224.0+ bytes

print(loaded_df)
#    id    name  rating                 created_at
# 0   1  Tanaka     3.5 2020-08-15 13:33:42.224807
# 1   2  Suzuki     NaN 2020-08-15 13:33:42.224807
# 2   3    Sato     4.2 2020-08-15 13:33:42.224807

Apache Arrow

Apache ArrowもApacheプロジェクトの1つです。こちらはインメモリの列指向データフォーマットを定義し、ライブラリとして提供してます。

arrow.apache.org

Arrowの特徴はこちらです。詳細は https://arrow.apache.org/overview/ を参照ください。

  • 同じ列は同じメモリブロックに含まれるようにレイアウトする
    • SIMD (Single Instruction, Multiple Data) アーキテクチャのCPUで高速に入出力できる
  • プログラミング言語に依存しないフォーマットで、ネストしたデータ型やユーザ定義の型などもサポート
    • Arrowを共通のストレージ (= ハブ) とし、Arrowとのシリアライザ・デシリアライザを実装するだけで、他のプログラミング言語やデータソースとのデータのやり取りができる

f:id:ohke:20200815120437p:plain

pyarrowを用いたParquetファイルの変換

このArrowのPython実装ライブラリの1つがpyarrowです。各種ファイルフォーマットやDataFrameなどに対応しており、例えば、CSVからParquet、ParquetからDataFrameといった変換もpyarrowを仲介することで可能となります。

  • pip install pyarrow でインストールできます
  • 内部的にはpyarrow.Tableオブジェクトとして扱われます

arrow.apache.org

CSVファイル -> Arrowテーブル -> Parquetファイル -> Arrowテーブル -> DataFrameオブジェクト という変換を行います。

# 上の続き

import pyarrow
import pyarrow.parquet
import pyarrow.csv

df.to_csv("test.csv", index=False)

# CSVファイルをArrow形式でロード
loaded_table = pyarrow.csv.read_csv("./test.csv")
print(loaded_table)
# pyarrow.Table
# id: int64
# name: string
# rating: double
# created_at: string

# Parquetに変換して保存
pyarrow.parquet.write_table(loaded_table, "./test.parquet")

# ParquetファイルをArrow形式でロード
loaded_parquet = pyarrow.parquet.read_table("./test.parquet")
print(loaded_parquet)
# pyarrow.Table
# id: int64
# name: string
# rating: double
# created_at: string

# ArrowをDataFrameへ変換
loaded_df = loaded_parquet.to_pandas()
print(loaded_df)
#    id    name  rating                  created_at
# 0   1  Tanaka     3.5  2020-08-15 14:15:08.543007
# 1   2  Suzuki     NaN  2020-08-15 14:15:08.543007
# 2   3    Sato     4.2  2020-08-15 14:15:08.543007

まとめ

今回はParquetファイルをPythonで入出力するための方法を2つ紹介しました。


  1. /ˈpɑːki,ˈpɑːkeɪ/ (パーキ、パーケイ) と読むそうです。