いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Pythonメモ-18 (joblib, 手軽に並列処理, multiprocessing, joblib.Parallel)

概要

multiprocessingモジュールよりも手軽に並列処理がサクッと書けるので重宝しているモジュール。

スクリプト作っていると、結構な頻度で大量のデータを一気に処理することが多くなります。

ちょんプロなので、時間かかってもいいのですが、何回も走らせているとその時間もムゥ・・・って

感じで嫌になってきますw

んで、並列で処理しようと考えるのですが、標準のmultiprocessingモジュールはちょっと使い方が難しい・・・

C#だと、Parallelってクラスがあって、ちょちょいと書けるので同じようなものがないのか探すと

以下のモジュールがヒット。

https://pythonhosted.org/joblib/

インストールは、pipで一発です。

pip install joblib

メモライズしたりとか機能はいろいろあるみたいですが、使いたいのはさくっと並列処理できるやつ。

joblib.Parallelってのを使う模様。

def heavy_proc(val):
    # 結構時間かかる処理とする


results = [heavy_proc(x) for x in data]

ってやってた部分を

results = joblib.Parallel(n_jobs=-1)([joblib.delayed(heavy_proc)(x) for x in data])

ってするだけ。n_jobsの部分は、-1 を指定するとマシンの全CPUで、所定の数値を与えるとその分のCPUで処理してくれます。

サンプル

いつもの全然意味のないサンプルです。よく利用するのは、処理しないといけない元ネタが大量にあって 同じ処理を繰り返して処理する、で、各処理は他のデータに依存しないっていう場合に使いやすいです。

C#でいう Parallel.For している感じ。

# coding: utf-8
"""
joblib モジュールについてのサンプルです。
"""

import datetime
import os
import random
import time

import joblib

from trypython.common.commoncls import SampleBase
from trypython.common.commonfunc import pr

NOW = datetime.datetime.now
RND = random.Random()


class Sample(SampleBase):
    def exec(self):
        start_dt = NOW()

        results = joblib.Parallel(n_jobs=-1)(
            [
                joblib.delayed(heavy_proc)(f'value-{i}', RND.randrange(1, 10))
                for i in range(1, 5)
            ]
        )

        end_dt = NOW()

        pr('job-results', results)
        pr('total elapsed', (end_dt - start_dt).seconds)


def heavy_proc(value: str, sleep_seconds: int) -> dict:
    start_dt = NOW()
    pid = os.getpid()
    pr('start', f'pid: {pid} [{value}] sleep: {sleep_seconds}')
    time.sleep(sleep_seconds)
    pr('end', f'pid: {pid} [{value}]')
    end_dt = NOW()

    return {
        'pid': pid,
        'elapsed': (end_dt - start_dt).seconds
    }


def go():
    obj = Sample()
    obj.exec()


if __name__ == '__main__':
    go()

実行すると例えば以下のようになります。

start='pid: 1719 [value-1] sleep: 1'
start='pid: 1720 [value-2] sleep: 4'
start='pid: 1721 [value-3] sleep: 9'
start='pid: 1722 [value-4] sleep: 9'
end='pid: 1719 [value-1]'
end='pid: 1720 [value-2]'
end='pid: 1722 [value-4]'
end='pid: 1721 [value-3]'
job-results=[{'elapsed': 1, 'pid': 1719},
 {'elapsed': 4, 'pid': 1720},
 {'elapsed': 9, 'pid': 1721},
 {'elapsed': 9, 'pid': 1722}]
total elapsed=9

みんなバラバラで処理しているので、 total elapsedがもっともsleepが長い 9 になっていますね。 シーケンシャルに処理していたら、 (1 + 4 + 9 + 9) の時間になっているはずです。 私のマシンはコアが4なので4並列で処理してくれたみたいです。

ソースは、以下でも見れます。

try-python/joblib01.py at master · devlights/try-python · GitHub

参考情報

https://pythonhosted.org/joblib/

GitHub - joblib/joblib: Python function as pipeline jobs.


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。