いろいろ備忘録日記

主に .NET とか Go とか Flutter とか Python絡みのメモを公開しています。

Pythonメモ-100 (aiojobs)(asyncio向けのジョブスケジューラライブラリ)

概要

便利なライブラリ見つけたので、忘れないうちにメモメモ。

github.com

aio-libsグループのライブラリですね。asyncio向けのジョブスケジューラを提供してくれます。

どんなライブラリなのかは、上のページのREADMEを見たほうが早いのですが、自分用に以下にサンプルコードをメモ。

環境

$ python --version
Python 3.7.2

インストール

$ python -m pip install aiojobs
Installing collected packages: multidict, idna, yarl, async-timeout, chardet, attrs, aiohttp, aiojobs
  Running setup.py install for yarl ... done
Successfully installed aiohttp-3.5.4 aiojobs-0.2.2 async-timeout-3.0.1 attrs-19.1.0 chardet-3.0.4 idna-2.8 multidict-4.5.2 yarl-1.3.0

使い方

import asyncio
import aiojobs

を行い、次にスケジューラを生成します。

async def main():
    scheduler = await aiojobs.create_scheduler()

スケジューラを生成したら、任意のタイミングでジョブを作成できます。

ジョブの作成は、spawn メソッドに対象の coroutine を渡して生成します。

async def coro():
    await asyncio.sleep(5.0)
    print('coro() done')

async def main():
    scheduler = await aiojobs.create_scheduler()
    job = scheduler. spawn(coro())

これでジョブ登録完了。順に実行されていきます。 一度に走らせるジョブの数を制限する場合はスケジューラ作成時にlimitを指定することで制限できます。 デフォルトは limit=100 となっているようです。

取得したジョブは

  • active
  • closed
  • pending

というプロパティを持っていて、これで状況を把握することもできます。

最後にスケジューラをクローズするときになったら

await scheduler.close()

と呼び出しを行います。これにより、まだ実行されていないジョブに対しては asyncio.CancelledError が raise されます。

なので、以下のようにしておくと、キャンセルされたかどうかを判定できますね。

async def coro(timeout, id):
    try:
        await asyncio.sleep(timeout)
        print(f'id:{id} done')
    except asyncio.CancelledError as e:
        print(f'cancelled id:{id}')    

サンプル

githubのサンプルとほぼ同じですが。。。

import asyncio
import aiojobs

async def coro(timeout, id):
    try:
        await asyncio.sleep(timeout)
        print(f'id:{id} done')
    except asyncio.CancelledError as e:
        print(f'cancelled id:{id}')

async def main():
    scheduler = await aiojobs.create_scheduler()
    for i in range(10):
        await scheduler.spawn(coro(i, i))
    await asyncio.sleep(5.0)
    await scheduler.close()

if __name__ == '__main__':
    asyncio.run(main())

実行すると、ジョブが10個登録されますが、全部完了する前にスケジューラを落としています。

なので、残りのジョブに関しては asyncio.CancelledError が発生します。

実行すると例えば以下のように出力されます。

id:0 done
id:1 done
id:2 done
id:3 done
id:4 done
id:5 done
cancelled id:9
cancelled id:6
cancelled id:7
cancelled id:8

手軽にジョブコントロールができるので便利ですねー。

サンプル2

"""
aiojobs ライブラリのサンプルです。

基本的な使い方について。

REFERENCES:: https://git.io/fjn1o
             http://bit.ly/2GYmZqU
"""
import asyncio

import aiojobs

from trypython.common.commoncls import SampleBase
from trypython.common.commonfunc import pr


# noinspection PyMethodMayBeStatic
class Sample(SampleBase):
    def exec(self):
        asyncio.run(self.async_main())

    async def async_main(self):
        # ---------------------------------------
        # aiojobs
        #   基本的な使い方については
        #   http://bit.ly/2GYmZqU
        #   を参照のこと。
        # ---------------------------------------
        # スケジューラ作成
        scheduler = await aiojobs.create_scheduler()

        # ジョブを生成
        jobs = []
        for i in range(5):
            jobs.append(await scheduler.spawn(self.coro(float(i), i)))

        # 現在の状況表示
        self.print_status('ジョブ生成直後', jobs)

        # 1 秒待機
        await asyncio.sleep(1.0)

        # 現在の状況表示
        self.print_status('1秒待機後', jobs)

        # 1 秒待機
        await asyncio.sleep(1.0)

        # 現在の状況表示
        self.print_status('2秒待機後', jobs)

        # スケジューラを落とす
        await scheduler.close()

        # 現在の状況表示
        self.print_status('スケジューラをclose後', jobs)

    async def coro(self, timeout: float, task_id: int):
        try:
            await asyncio.sleep(timeout)
            pr('done', task_id)
        except asyncio.CancelledError:
            pr('cancelled', task_id)

    def print_status(self, name: str, jobs: list):
        pr(name, [
            f'active={job.active} closed={job.closed} pending={job.pending}'
            for job in jobs
        ])


def go():
    obj = Sample()
    obj.exec()


if __name__ == '__main__':
    go()

try-python/aiojobs01.py at master · devlights/try-python · GitHub


過去の記事については、以下のページからご参照下さい。

  • いろいろ備忘録日記まとめ

devlights.github.io

サンプルコードは、以下の場所で公開しています。

  • いろいろ備忘録日記サンプルソース置き場

github.com

github.com