Python: Joblibで並列処理プログラミング

Pythonの並列処理では標準ライブラリであるmultiprocessingがよく使われると思いますが、「もっと気楽に実装したい」という場合に便利なのがJoblibです。

github.com

今回はJoblibを使った並列処理プログラミングについて紹介します。

基本的な使い方

使い始める前に、pipでインストールしておきます。

$ pip install joblib

実験用に、実行に3秒程度かかる関数heavy_square_taskを定義します。引数を2乗した値を返します。

from time import sleep
import timeit

def heavy_square_task(x):
    sleep(3)
    return x**2

以下のようにこの関数を4回実行すると、12秒かかります。直列処理のため、純粋に3秒×4タスクとなっています。

[heavy_square_task(x) for x in [1, 2, 3, 4]] 
# [1, 2, 3, 4]
# 12 s ± 3.26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

上の関数をJoblibで並列化させた実装が以下です。
キーはParallel関数です。

  • n_jobsオプションは並列数を指定します (デフォルト: 1)
    • 並列数2としているので、12秒から6秒まで全体の実行時間は縮められています
    • n_jobs=-1とすると、CPUコア数の数だけ並列化されます
  • delayedはタスクごとに関数と実引数をタプルにまとめるためだけのお便利関数です
    • そのため[delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]]は4つのタプルからなるリストに変換されます
from joblib import Parallel, delayed

Parallel(n_jobs=2)([delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]])
# [1, 4, 9, 16]
# 6.01 s ± 857 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

verboseオプションを1以上に設定することで、進捗状況も表示できます。渡す値は頻度を表しており、10以上にすると全てのイテレーションが出力されます。

Parallel(n_jobs=2, verbose=10)([delayed(heavy_square_task)(x) for x in [1, 2, 3, 4]])
# [Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
# [Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    3.0s
# [Parallel(n_jobs=2)]: Done   2 out of   4 | elapsed:    3.0s remaining:    3.0s
# [Parallel(n_jobs=2)]: Done   4 out of   4 | elapsed:    6.0s remaining:    0.0s
# [Parallel(n_jobs=2)]: Done   4 out of   4 | elapsed:    6.0s finished

マルチプロセスとマルチスレッドの切り替え

Parallelは、デフォルトではマルチプロセスによって並列化されます。

試しにプロセスIDとスレッドIDも返すようにしたheavy_square_task2関数を定義して、同じように実行してみますと、2つのプロセス(9077と9078)が実行されていることがわかります。

import os
import threading

def heavy_square_task2(x):
    sleep(3)
    return (x**2, os.getpid(), threading.get_ident())

Parallel(n_jobs=2)([delayed(heavy_square_task2)(x) for x in [1, 2, 3, 4]])
# [(1, 9077, 140735743546240),
#  (4, 9078, 140735743546240),
#  (9, 9077, 140735743546240),
#  (16, 9078, 140735743546240)]

スレッドで並列化する場合、Parallelのbackendオプションに"threading"を設定します。
これを実行すると、同一プロセス(8364)上で、2つのスレッドID(123145360625664, 123145365880832)で並列化されていることがわかります。

Parallel(n_jobs=2, backend='threading')([delayed(heavy_square_task2)(x) for x in [1, 2, 3, 4]])
# [(1, 8364, 123145360625664),
#  (4, 8364, 123145365880832),
#  (9, 8364, 123145360625664),
#  (16, 8364, 123145365880832)]

プロセスと比較するとスレッドの方がオーバヘッドは小さいのですが、Pythonではグローバルインタプリタロック(GIL)によって同時実行されるスレッドは1つになるため、CPUバウンドの処理は直列実行よりも遅くなることがあります。GILの公式ドキュメントはこちらです。
CPUバウンドの処理heavy_divide_taskを定義して、実験をしてみました。直列実行と比較して、2プロセスの場合は概ね倍速になっていますが、2スレッドの場合は少し遅くなっていることがわかります。

# CPUバウンドの処理
def heavy_divide_task(x):
    max_divisor = 0
    
    for i in range(2, x):
        if x % i == 0:
            max_divisor = i
            
    return max_divisor

# 直列実行(1プロセス・1スレッド)
%timeit [heavy_divide_task(x) for x in [20000000, 20000000]]
# 3.4 s ± 124 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# 2プロセス
%timeit Parallel(n_jobs=2)([delayed(heavy_divide_task)(x) for x in [20000000, 20000000]])
# 1.76 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# 2スレッド
%timeit Parallel(n_jobs=2, backend='threading')([delayed(heavy_divide_task)(x) for x in [20000000, 20000000]])
# 3.41 s ± 129 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

まとめ

今回はJoblibの基本的な使い方を紹介しました。またPythonのマルチスレッドプログラミングではGILに注意すべきということについても触れました。

Python: PandasのDataFrameを横持ち・縦持ちに変換する

PandasのDataFrameを縦持ちから横持ちにする方法とその逆(横持ちから縦持ちにする方法)についての備忘録です。

縦持ちと横持ち

縦持ちは、以下のように、カラム固定で1行に1つの値を持たせている表です。カラムをおいそれと変更できないDBのテーブルなどはこういった形かと思います。

customer_id product_id count
C1 P1 1
C1 P2 2
C2 P2 2
C2 P2 1
C3 P3 3

一方で、横持ちは、カラム数が可変で1行に複数の値をもたせている表です。行列はこういう形になるでしょう。

P1 P2 P3
C1 1 2 0
C2 0 3 0
C3 0 0 3

縦持ちから横持ちへ変換する

それでは縦持ちとなっている以下のデータを横持ちへ変換します。

import pandas as pd
import numpy as np

orders_df = pd.DataFrame({'customer_id': ['C1', 'C1', 'C2', 'C2', 'C3'], 'product_id': ['P1', 'P2', 'P2', 'P2', 'P3'], 'count': [1, 2, 2, 1, 3]})

f:id:ohke:20180721120245p:plain

横持ちにするときは、Pandasのpivot_tableメソッドを使います。 インデックスにcustomer_id、カラムにはproduct_idを指定することで、customer_id×product_idの横持ちテーブルとなります。

pivot_orders_df = orders_df.pivot_table(values=['count'], index=['customer_id'], columns=['product_id'], aggfunc='sum')

f:id:ohke:20180721131300p:plain

いくつかオプション引数があります。

  • aggfuncには、同じcustomer_idとproduct_idの値を集約するためのnumpyの関数を指定します
    • 例えば、customer_id='C2'とprodct_id='P2'の組み合わせが2つありますが、合計値3で埋められてます
    • デフォルトでは"mean"ですが、"sum"、"count"、"max"、"min"などが使えます
  • fill_valueで、欠測値を指定します (デフォルトではNaN)

横持ちから縦持ちへ変換する

上で横持ちにしたテーブルを縦持ちに戻します。

縦持ちにするときは、stackメソッドが使えます。

orders_df = pivot_orders_df.stack()

f:id:ohke:20180721133105p:plain

dropna=Falseとすると、NaNの行となります。

orders_df = pivot_orders_df.stack(dropna=False)

f:id:ohke:20180721133222p:plain

元のテーブルの通り、インデックスではなくカラムにする場合は、reset_indexメソッドでカラム化します。

orders_df = pivot_orders_df.stack().reset_index()

f:id:ohke:20180721133621p:plain

Scrapyでけ日記をクローリングする (3. parseへ任意の値を渡す方法とエラーハンドリング)

前回・前々回に引き続き、Scrapyを使ってこのブログのクローリングを行います。

github.com

今回は細々としたところで、Spiderクラスのparseメソッドへ値を受け渡す方法と、エラーハンドリングについてです。Spiderの実装は前々回の投稿も参考にしてみてください。

ohke.hateblo.jp

なお、以下のフォルダ構造となってます。

f:id:ohke:20180707152304p:plain

parseへ任意の値を渡す

parseでスクレイピングして得た値を別のページのparse処理でも使い回したい、など、parseメソッドに任意の値を渡して利用したいケースがあります。

そういった場合、metaプロパティを使います。

  • リクエストを作るときにrequest.meta['key'] = valueで値を設定
  • parseメソッド内ではvalue = response.meta['key']で値を取得

2018年のエントリタイトルと投稿日を、1月から順にスクレイピングしていく例を示します。archive_spider.pyに実装しています。

  • http://ohke.hateblo.jp/archive/2018/{月}へ1月から順にアクセスしていきます
  • start_requestsでは、request.meta['month']に初期値1を渡してリクエストしています
  • parseでは、response.meta['month']で今見ている月を取り出し、インクリメントして次のリクエストを作っています
import scrapy
from ..items import PostItem

class ArchiveSpider(scrapy.Spider):
    name = 'archive_spider'

    url_format = 'https://ohke.hateblo.jp/archive/2018/{}'
    filename = '2018.txt'

    def start_requests(self):
        start_month = 1
        url = self.url_format.format(start_month)

        request = scrapy.Request(url=url, callback=self.parse)
        request.meta['month'] = start_month  # 値の設定

        yield request

    def parse(self, response):
        month = response.meta['month']  # 値の取得

        title_list = response.css('a.entry-title-link::text').extract()
        date_list = response.xpath('//time/@datetime').extract()

        with open(self.filename, 'a') as f:
            for t, d in zip(title_list, date_list):
                f.write('{}\t{}\n'.format(d, t))

        next_month = month + 1
        next_url = self.url_format.format(month+1)
        # オプションmetaに辞書形式でも渡せます
        yield scrapy.Request(url=next_url, callback=self.parse, meta={'month': month})

エラーハンドリング

スクレイピングをしていると、リンクが切れていたり、権限のためにアクセスできない、などの問題は避けられません。

Scrapyでは、リトライなどの基本的な設定をsettings.pyでできます。

  • RETRY_TIMES: リトライ回数 (デフォルトでは、2)
  • RETRY_HTTP_CODES: リトライするHTTPステータスコード (デフォルトでは、[500, 502, 503, 504, 408])
  • HTTPERROR_ALLOWED_CODES: エラーとして処理するHTTPステータスコード (デフォルトは [] で全てエラー扱い)

Spiderごとに設定することもでき、例えば400エラーをparse内でハンドリングして正常終了する場合は、以下のような実装になります。
monthが13の場合、存在しない月なので400が返ってきます。それをエラーとせず、parse内でreturnして終了させています。

class ArchiveSpider(scrapy.Spider):
    name = 'archive_spider'
    handle_httpstatus_list = [400]  # ステータス400の場合はエラーにしない

    # 省略

    def parse(self, response):
        # ステータスが400の場合は、クローリングを止める
        if response.status in [400]:
            print(response.status)
            return

        month = response.meta['month']

        title_list = response.css('a.entry-title-link::text').extract()
        date_list = response.xpath('//time/@datetime').extract()

        with open(self.filename, 'a') as f:
            for t, d in zip(title_list, date_list):
                f.write('{}\t{}\n'.format(d, t))

        next_month = month + 1
        next_url = self.url_format.format(month+1)
        yield scrapy.Request(url=next_url, callback=self.parse, meta={'month': month})

まとめ

今回もScrapyの使い方について、parseへのパラメータの渡し方と、エラーハンドリニングについて触れました。

参考文献

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-

Pythonクローリング&スクレイピング -データ収集・解析のための実践開発ガイド-