概要
便利なライブラリ見つけたので、忘れないうちにメモメモ。
aio-libsグループのライブラリですね。asyncio向けのジョブスケジューラを提供してくれます。
どんなライブラリなのかは、上のページのREADMEを見たほうが早いのですが、自分用に以下にサンプルコードをメモ。
環境
$ python --version
Python 3.7.2
インストール
$ python -m pip install aiojobs Installing collected packages: multidict, idna, yarl, async-timeout, chardet, attrs, aiohttp, aiojobs Running setup.py install for yarl ... done Successfully installed aiohttp-3.5.4 aiojobs-0.2.2 async-timeout-3.0.1 attrs-19.1.0 chardet-3.0.4 idna-2.8 multidict-4.5.2 yarl-1.3.0
使い方
import asyncio import aiojobs
を行い、次にスケジューラを生成します。
async def main(): scheduler = await aiojobs.create_scheduler()
スケジューラを生成したら、任意のタイミングでジョブを作成できます。
ジョブの作成は、spawn
メソッドに対象の coroutine
を渡して生成します。
async def coro(): await asyncio.sleep(5.0) print('coro() done') async def main(): scheduler = await aiojobs.create_scheduler() job = scheduler. spawn(coro())
これでジョブ登録完了。順に実行されていきます。
一度に走らせるジョブの数を制限する場合はスケジューラ作成時にlimit
を指定することで制限できます。
デフォルトは limit=100
となっているようです。
取得したジョブは
- active
- closed
- pending
というプロパティを持っていて、これで状況を把握することもできます。
最後にスケジューラをクローズするときになったら
await scheduler.close()
と呼び出しを行います。これにより、まだ実行されていないジョブに対しては asyncio.CancelledError
が raise されます。
なので、以下のようにしておくと、キャンセルされたかどうかを判定できますね。
async def coro(timeout, id): try: await asyncio.sleep(timeout) print(f'id:{id} done') except asyncio.CancelledError as e: print(f'cancelled id:{id}')
サンプル
githubのサンプルとほぼ同じですが。。。
import asyncio import aiojobs async def coro(timeout, id): try: await asyncio.sleep(timeout) print(f'id:{id} done') except asyncio.CancelledError as e: print(f'cancelled id:{id}') async def main(): scheduler = await aiojobs.create_scheduler() for i in range(10): await scheduler.spawn(coro(i, i)) await asyncio.sleep(5.0) await scheduler.close() if __name__ == '__main__': asyncio.run(main())
実行すると、ジョブが10個登録されますが、全部完了する前にスケジューラを落としています。
なので、残りのジョブに関しては asyncio.CancelledError
が発生します。
実行すると例えば以下のように出力されます。
id:0 done id:1 done id:2 done id:3 done id:4 done id:5 done cancelled id:9 cancelled id:6 cancelled id:7 cancelled id:8
手軽にジョブコントロールができるので便利ですねー。
サンプル2
""" aiojobs ライブラリのサンプルです。 基本的な使い方について。 REFERENCES:: https://git.io/fjn1o http://bit.ly/2GYmZqU """ import asyncio import aiojobs from trypython.common.commoncls import SampleBase from trypython.common.commonfunc import pr # noinspection PyMethodMayBeStatic class Sample(SampleBase): def exec(self): asyncio.run(self.async_main()) async def async_main(self): # --------------------------------------- # aiojobs # 基本的な使い方については # http://bit.ly/2GYmZqU # を参照のこと。 # --------------------------------------- # スケジューラ作成 scheduler = await aiojobs.create_scheduler() # ジョブを生成 jobs = [] for i in range(5): jobs.append(await scheduler.spawn(self.coro(float(i), i))) # 現在の状況表示 self.print_status('ジョブ生成直後', jobs) # 1 秒待機 await asyncio.sleep(1.0) # 現在の状況表示 self.print_status('1秒待機後', jobs) # 1 秒待機 await asyncio.sleep(1.0) # 現在の状況表示 self.print_status('2秒待機後', jobs) # スケジューラを落とす await scheduler.close() # 現在の状況表示 self.print_status('スケジューラをclose後', jobs) async def coro(self, timeout: float, task_id: int): try: await asyncio.sleep(timeout) pr('done', task_id) except asyncio.CancelledError: pr('cancelled', task_id) def print_status(self, name: str, jobs: list): pr(name, [ f'active={job.active} closed={job.closed} pending={job.pending}' for job in jobs ]) def go(): obj = Sample() obj.exec() if __name__ == '__main__': go()
try-python/aiojobs01.py at master · devlights/try-python · GitHub
過去の記事については、以下のページからご参照下さい。
- いろいろ備忘録日記まとめ
サンプルコードは、以下の場所で公開しています。
- いろいろ備忘録日記サンプルソース置き場