いろいろ備忘録日記

主に .NET と Python絡みのメモを公開しています。

Pythonメモ-18 (joblib, 手軽に並列処理, multiprocessing, joblib.Parallel)

概要

multiprocessingモジュールよりも手軽に並列処理がサクッと書けるので重宝しているモジュール。

スクリプト作っていると、結構な頻度で大量のデータを一気に処理することが多くなります。

ちょんプロなので、時間かかってもいいのですが、何回も走らせているとその時間もムゥ・・・って

感じで嫌になってきますw

んで、並列で処理しようと考えるのですが、標準のmultiprocessingモジュールはちょっと使い方が難しい・・・

C#だと、Parallelってクラスがあって、ちょちょいと書けるので同じようなものがないのか探すと

以下のモジュールがヒット。

https://pythonhosted.org/joblib/

インストールは、pipで一発です。

pip install joblib

メモライズしたりとか機能はいろいろあるみたいですが、使いたいのはさくっと並列処理できるやつ。

joblib.Parallelってのを使う模様。

def heavy_proc(val):
    # 結構時間かかる処理とする


results = [heavy_proc(x) for x in data]

ってやってた部分を

results = joblib.Parallel(n_jobs=-1)([joblib.delayed(heavy_proc)(x) for x in data])

ってするだけ。n_jobsの部分は、-1 を指定するとマシンの全CPUで、所定の数値を与えるとその分のCPUで処理してくれます。

サンプル

いつもの全然意味のないサンプルです。よく利用するのは、処理しないといけない元ネタが大量にあって 同じ処理を繰り返して処理する、で、各処理は他のデータに依存しないっていう場合に使いやすいです。

C#でいう Parallel.For している感じ。

# coding: utf-8
"""
joblib モジュールについてのサンプルです。
"""

import datetime
import os
import random
import time

import joblib

from trypython.common.commoncls import SampleBase
from trypython.common.commonfunc import pr

NOW = datetime.datetime.now
RND = random.Random()


class Sample(SampleBase):
    def exec(self):
        start_dt = NOW()

        results = joblib.Parallel(n_jobs=-1)(
            [
                joblib.delayed(heavy_proc)(f'value-{i}', RND.randrange(1, 10))
                for i in range(1, 5)
            ]
        )

        end_dt = NOW()

        pr('job-results', results)
        pr('total elapsed', (end_dt - start_dt).seconds)


def heavy_proc(value: str, sleep_seconds: int) -> dict:
    start_dt = NOW()
    pid = os.getpid()
    pr('start', f'pid: {pid} [{value}] sleep: {sleep_seconds}')
    time.sleep(sleep_seconds)
    pr('end', f'pid: {pid} [{value}]')
    end_dt = NOW()

    return {
        'pid': pid,
        'elapsed': (end_dt - start_dt).seconds
    }


def go():
    obj = Sample()
    obj.exec()


if __name__ == '__main__':
    go()

実行すると例えば以下のようになります。

start='pid: 1719 [value-1] sleep: 1'
start='pid: 1720 [value-2] sleep: 4'
start='pid: 1721 [value-3] sleep: 9'
start='pid: 1722 [value-4] sleep: 9'
end='pid: 1719 [value-1]'
end='pid: 1720 [value-2]'
end='pid: 1722 [value-4]'
end='pid: 1721 [value-3]'
job-results=[{'elapsed': 1, 'pid': 1719},
 {'elapsed': 4, 'pid': 1720},
 {'elapsed': 9, 'pid': 1721},
 {'elapsed': 9, 'pid': 1722}]
total elapsed=9

みんなバラバラで処理しているので、 total elapsedがもっともsleepが長い 9 になっていますね。 シーケンシャルに処理していたら、 (1 + 4 + 9 + 9) の時間になっているはずです。 私のマシンはコアが4なので4並列で処理してくれたみたいです。

ソースは、以下でも見れます。

try-python/joblib01.py at master · devlights/try-python · GitHub

参考情報

https://pythonhosted.org/joblib/

GitHub - joblib/joblib: Python function as pipeline jobs.


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。

Pythonメモ-17 (psutil, システム情報取得, 空きメモリ取得)

概要

たまに空きメモリ量をスクリプトで取りたいときに、よく忘れるのでメモ。

psutil モジュール自体は、メモリ量だけじゃなくてシステム情報いろいろ取得できる超便利なモジュール。

ネットワークの情報取得とかは重宝してます。

メモリ量は

psutil.virtual_memory()

から取得できます。

サンプル

# coding: utf-8
"""
psutil モジュールについてのサンプルです。
"""
import psutil as ps

from trypython.common.commoncls import SampleBase
from trypython.common.commonfunc import pr


class Sample(SampleBase):
    def exec(self):
        # ---------------------------------------------------------------
        # [link]
        # https://github.com/giampaolo/psutil
        # ---------------------------------------------------------------
        # psutil モジュールは
        #   - CPU時間
        #   - メモリ情報
        #   - ディスク情報
        #   - ネットワーク情報
        #   - センサー情報
        #   - ユーザ情報
        #   - プロセス情報
        # といったシステム情報が取得できる便利なモジュール。
        # ---------------------------------------------------------------
        # 空きメモリを取得するには、virtual_memory() を使用して取得できる
        # svmem オブジェクトの available プロパティから取得できる。
        # ---------------------------------------------------------------
        virtual_memory = ps.virtual_memory()
        available_memory_bytes = virtual_memory.available  # type: int

        available_memory_gb = self._to_gb(float(available_memory_bytes))
        pr('available_memory', self._adjust(available_memory_gb))

    def _to_kb(self, byte_size: float) -> float:
        return byte_size / 1024

    def _to_mb(self, byte_size: float) -> float:
        return self._to_kb(byte_size) / 1024

    def _to_gb(self, byte_size: float) -> float:
        return self._to_mb(byte_size) / 1024

    def _adjust(self, value: float) -> float:
        return round(value, 2)


def go():
    obj = Sample()
    obj.exec()


if __name__ == '__main__':
    go()

サンプルは以下からも見れます。

try-python/psutil01.py at master · devlights/try-python · GitHub

参考情報

system - How to get current CPU and RAM usage in Python? - Stack Overflow

GitHub - giampaolo/psutil: A cross-platform process and system utilities module for Python


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。

Pythonメモ-16 (paramiko, ssh, UnicodeDecodeError, stdoutとstderrがテキストモード, monkey_patch関数)

概要

python で sshとかsftp処理しようとすると大抵出て来る paramiko モジュールさん。

超便利なのですが、sshでやり取りする処理を書いて

標準出力を受け取ろうとすると、UnicodeDecodeError が発生するときがあります。

なんでなのかというと、paramiko内部処理の exec_command 関数にて

stdin のみがバイナリモードで、stdoutとstderrがテキストモードで処理しようとしているためです。

たまたま、euc-jpなlinuxマシン用に処理書いててエラーが発生したので知りました。

で、ソースみてみると、モードの部分はパラメータにもなっていないので、ネットで情報探してみると

以下を発見。

Monkey patch for paramiko issue 291 · GitHub

なるほど。置き換えればいいのねってことで、以下自分用の処理つくったのでメモメモ。

サンプル

# coding: utf-8
"""
paramiko の SSHClient.exec_command は内部で stdin のみ binary-mode で
処理しているが、stdout と stderr はテキストモードで処理している。

そのため、euc-jp な環境で動かすと UnicodeDecodeError が発生してしまう。

それを防ぐために、stdout, stderr を binary-mode で処理するパッチ関数を以下に定義している。

以下の情報を参考にした。
  https://gist.github.com/smurn/4d45a51b3a571fa0d35d
"""
import paramiko


def monkey_patch():
    paramiko.SSHClient.exec_command = _patched_exec_command


def _patched_exec_command(
        self,
        command: str,
        bufsize: int = -1,
        timeout: int = None,
        get_pty: bool = False,
        environment: dict = None,
) -> tuple:
    """
    元の exec_command の処理そのままで stdout, stderr を binary-mode で処理します。
    """
    chan = self._transport.open_session(timeout=timeout)
    if get_pty:
        chan.get_pty()
    chan.settimeout(timeout)
    if environment:
        chan.update_environment(environment)
    chan.exec_command(command)
    stdin = chan.makefile('wb', bufsize)
    stdout = chan.makefile('rb', bufsize)
    stderr = chan.makefile_stderr('rb', bufsize)
    return stdin, stdout, stderr

あとは、paramiko使う前にmonkey_patchします。

# coding: utf-8

import paramiko
import trypython.extlib.paramiko_monkeypatch as paramiko_patch

paramiko_patch.monkey_patch()

client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(hostname='xxx.xxx.xxx.xxx', username='user', password='passwd')
sin, sout, serr = client.exec_command('ls -l')

for x in sout:
    print(x.decode('euc-jp'), end='')

client.close()

参考情報

Monkey patch for paramiko issue 291 · GitHub

UnicodeDecodeError: 'utf8' codec can't decode byte 0x83 in position 20: invalid start byte · Issue #707 · paramiko/paramiko · GitHub

paramiko/client.py at master · paramiko/paramiko · GitHub

Welcome to Paramiko! — Paramiko documentation


過去の記事については、以下のページからご参照下さい。

サンプルコードは、以下の場所で公開しています。