キュー¶
ソースコード: Lib/asyncio/queues.py
asyncio キューは queue
モジュールのクラス群と同じ形になるように設計されています。 asyncio キューはスレッドセーフではありませんが、それらは async/await コードから使われるために特別に設計されています。
asyncio キューのメソッドは timeout パラメータを持たないことに注意してください; タイムアウトを伴うキューを使った処理を行うには asyncio.wait_for()
関数を使ってください。
下記の 使用例 節も参照してください。
Queue¶
- class asyncio.Queue(maxsize=0)¶
先入れ先出し (FIFO) キューです。
maxsize がゼロ以下の場合、キューは無限長になります。
0
より大きい整数の場合、キューが maxsize に達するとawait put()
はget()
によってキューの要素が除去されるまでブロックします。標準ライブラリにおけるスレッドベースの
queue
モジュールと異なり、キューのサイズは常に既知であり、qsize()
メソッドを呼び出すことによって取得することができます。バージョン 3.10 で変更: loop パラメータが削除されました。
このクラスは スレッド安全ではありません。
- maxsize¶
キューに追加できるアイテム数です。
- empty()¶
キューが空ならば
True
を、そうでなければFalse
を返します。
- full()¶
キューに要素が
maxsize
個あればTrue
を返します。If the queue was initialized with
maxsize=0
(the default), thenfull()
never returnsTrue
.
- async get()¶
キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
直ちに利用できるアイテムがあるときはそれを、そうでなければ
QueueEmpty
を返します。
- async join()¶
キューにある全ての要素が取得され、処理されるまでブロックします。
未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために
task_done()
を呼び出すと減算されます。未完了のタスクのカウント値がゼロになると、join()
のブロックは解除されます。
- async put(item)¶
要素をキューに入力します。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機します。
Raises
QueueShutDown
if the queue has been shut down.
- qsize()¶
キュー内の要素数を返します。
- shutdown(immediate=False)¶
Put a
Queue
instance into a shutdown mode.The queue can no longer grow. Future calls to
put()
raiseQueueShutDown
. Currently blocked callers ofput()
will be unblocked and will raiseQueueShutDown
in the formerly blocked thread.If immediate is false (the default), the queue can be wound down normally with
get()
calls to extract tasks that have already been loaded.And if
task_done()
is called for each remaining task, a pendingjoin()
will be unblocked normally.Once the queue is empty, future calls to
get()
will raiseQueueShutDown
.If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty and the count of unfinished tasks is reduced by the number of tasks drained. If unfinished tasks is zero, callers of
join()
are unblocked. Also, blocked callers ofget()
are unblocked and will raiseQueueShutDown
because the queue is empty.Use caution when using
join()
with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.Added in version 3.13.
- task_done()¶
Indicate that a formerly enqueued work item is complete.
Used by queue consumers. For each
get()
used to fetch a work item, a subsequent call totask_done()
tells the queue that the processing on the work item is complete.join()
が現在ブロック中だった場合、全アイテムが処理されたとき (put()
でキューに追加された全アイテムのtask_done()
の呼び出しを受信したとき) に再開します。キューに追加されているアイテム数以上の呼び出しが行われたときに
ValueError
を送出します。
優先度付きのキュー¶
LIFO キュー¶
例外¶
- exception asyncio.QueueEmpty¶
この例外は
get_nowait()
メソッドが空のキューに対して呼ばれたときに送出されます。
- exception asyncio.QueueFull¶
サイズが maxsize に達したキューに対して
put_nowait()
メソッドが 呼ばれたときに送出される例外です。
使用例¶
キューを使って、並行処理を行う複数のタスクにワークロードを分散させることができます:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())