け日記

最近はPythonでいろいろやってます

Python: Joblibで並列処理プログラミング

Pythonの並列処理では標準ライブラリであるmultiprocessingがよく使われると思いますが、「もっと気楽に実装したい」という場合に便利なのがJoblibです。

github.com

今回はJoblibを使った並列処理プログラミングについて紹介します。

基本的な使い方

使い始める前に、pipでインストールしておきます。

$ pip install joblib

実験用に、実行に3秒程度かかる関数heavy_square_taskを定義します。引数を2乗した値を返します。

from time import sleep
import timeit

def heavy_square_task(x):
    sleep(3)
    return x**2

以下のようにこの関数を4回実行すると、12秒かかります。直列処理のため、純粋に3秒×4タスクとなっています。

[heavy_square_task(x) for x in [1, 2, 3, 4]] 
# [1, 2, 3, 4]
# 12 s ± 3.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

上の関数をJoblibで並列化させた実装が以下です。
キーはParallel関数です。

  • n_jobsオプションは並列数を指定します (デフォルト: 1)
    • 並列数2としているので、12秒から6秒まで全体の実行時間は縮められています
    • n_jobs=-1とすると、CPUコア数の数だけ並列化されます
  • delayedはタスクごとに関数と実引数をタプルにまとめるためだけのお便利関数です
    • そのため[delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]]は4つのタプルからなるリストに変換されます
import joblib

joblib.Parallel(n_jobs=2)([delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]])
# [1, 4, 9, 16]
# 6.01 s ± 857 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

verboseオプションを1以上に設定することで、進捗状況も表示できます。渡す値は頻度を表しており、10以上にすると全てのイテレーションが出力されます。

joblib.Parallel(n_jobs=2, verbose=10)([delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]])
# [Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
# [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    3.0s
# [Parallel(n_jobs=2)]: Done   2 out of   4 | elapsed:    3.0s remaining:    3.0s
# [Parallel(n_jobs=2)]: Done   4 out of   4 | elapsed:    6.0s remaining:    0.0s
# [Parallel(n_jobs=2)]: Done   4 out of   4 | elapsed:    6.0s finished

マルチプロセスとマルチスレッドの切り替え

Parallelは、デフォルトではマルチプロセスによって並列化されます。

試しにプロセスIDとスレッドIDも返すようにしたheavy_square_task2関数を定義して、同じように実行してみますと、2つのプロセス(9077と9078)が実行されていることがわかります。

import os
import threading

def heavy_square_task2(x):
    sleep(3)
    return (x**2, os.getpid(), threading.get_ident())

joblib.Parallel(n_jobs=2)([delayed(heavy_square_task2)(x) for x in [1, 2, 3, 4]])
# [(1, 9077, 140735743546240),
#  (4, 9078, 140735743546240),
#  (9, 9077, 140735743546240),
#  (16, 9078, 140735743546240)]

スレッドで並列化する場合、Parallelのbackendオプションに"threading"を設定します。
これを実行すると、同一プロセス(8364)上で、2つのスレッドID(123145360625664, 123145365880832)で並列化されていることがわかります。

joblib.Parallel(n_jobs=2, backend='threading')([delayed(heavy_square_task2)(x) for x in [1, 2, 3, 4]])
# [(1, 8364, 123145360625664),
#  (4, 8364, 123145365880832),
#  (9, 8364, 123145360625664),
#  (16, 8364, 123145365880832)]

プロセスと比較するとスレッドの方がオーバヘッドは小さいのですが、Pythonではグローバルインタプリタロック(GIL)によって同時実行されるスレッドは1つになるため、CPUバウンドの処理は直列実行よりも遅くなることがあります。GILの公式ドキュメントはこちらです。
CPUバウンドの処理heavy_divide_taskを定義して、実験をしてみました。直列実行と比較して、2プロセスの場合は概ね倍速になっていますが、2スレッドの場合は少し遅くなっていることがわかります。

# CPUバウンドの処理
def heavy_divide_task(x):
    max_divisor = 0
    
    for i in range(2, x):
        if x % i == 0:
            max_divisor = i
            
    return max_divisor

# 直列実行(1プロセス・1スレッド)
%timeit [heavy_divide_task(x) for x in [20000000, 20000000]]
# 3.4 s ± 124 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# 2プロセス
%timeit joblib.Parallel(n_jobs=2)([delayed(heavy_divide_task)(x) for x in [20000000, 20000000]])
# 1.76 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# 2スレッド
%timeit joblib.Parallel(n_jobs=2, backend='threading')([delayed(heavy_divide_task)(x) for x in [20000000, 20000000]])
# 3.41 s ± 129 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

まとめ

今回はJoblibの基本的な使い方を紹介しました。またPythonのマルチスレッドプログラミングではGILに注意すべきということについても触れました。