Skip to content

gh-120924: Simplify consumers by adding asyncio.Queue.iter() and asyncio.Queue.iter_nowait() #120927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 42 commits into from

Conversation

nineteendo
Copy link
Contributor

@nineteendo nineteendo commented Jun 23, 2024

Purpose

Currently consuming items from a queue is very complex. You need to

  1. use an infinite while loop
  2. call queue.task_done() after processing each item
  3. add all worker tasks to a list
  4. join the queue
  5. cancel all worker tasks.
  6. wait until all worker tasks are cancelled

By adding asyncio.Queue.iter() and asyncio.Queue.iter_nowait(), this becomes a lot easier, you only need to call queue.shutdown(). We're using methods to allow queue.Queue to have the same interface.

Overview of changes

  • Calling queue.Queue.iter() returns an asynchronous generator which iterates over the queue of items
  • Calling queue.Queue.iter_nowait() returns a generator which iterates over the queue of items without blocking.
  • A private _AsyncQueueIterator has been added to handle the asynchronous iteration
  • The example in the documentation is greatly simplified by the new addition

Example

Without iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    while True:
        i = await queue.get()
        print(f'queue -> {i}')
        await asyncio.sleep(.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    tasks = []
    for _ in range(1):
        task = asyncio.create_task(consumer(queue))
        tasks.append(task)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))

    print('produced everything')
    await queue.join()
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    print('consumed everything')

asyncio.run(main())

With iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    async for i in queue.iter():
        print(f'queue -> {i}')
        await asyncio.sleep(.3)

async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg1:
        tg1.create_task(consumer(queue))
        async with asyncio.TaskGroup() as tg2:
            tg2.create_task(producer(queue))

        queue.shutdown()
        print('produced everything')

    print('consumed everything')

asyncio.run(main())

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: https://cpython-previews--120927.org.readthedocs.build/

@nineteendo
Copy link
Contributor Author

cc @Zac-HD

@Zac-HD
Copy link
Contributor

Zac-HD commented Jun 23, 2024

#120491 remains my preferred approach. Given @rhettinger's review of #120503 I still support making asyncio.Queue aiterable but would drop any changes to synchronous queue classes.

More generally, I suggest openning only one or two PRs at a time - having four very similar PRs with multiple issues makes is much harder to track what's going on.

@willingc
Copy link
Contributor

Removing review requests on this PR and marking DO NOT MERGE. Let's finish the discussion on #120491 #120491 (comment)

@nineteendo nineteendo closed this Jun 24, 2024
@nineteendo nineteendo deleted the add-asyncio.Queue.iter branch June 24, 2024 07:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants