Skip to content

gh-119154: Simplify consumers by making asyncio.Queue an asynchronous iterable #120491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from

Conversation

nineteendo
Copy link
Contributor

@nineteendo nineteendo commented Jun 14, 2024

Purpose

Currently consuming items from a queue is very complex. You need to

  1. use an infinite while loop
  2. call queue.task_done() after processing each item
  3. add all worker tasks to a list
  4. join the queue
  5. cancel all worker tasks.
  6. wait until all worker tasks are cancelled

By making asyncio.Queue an asynchronous iterable, this becomes a lot easier, you only need to call queue.shutdown().

Overview of changes

  • Calling asyncio.Queue.__aiter__() returns an iterator which iterates over the queue of items
  • A private _AsyncQueueIterator has been added to handle the iteration
  • The example in the documentation is greatly simplified by the new addition

Example

Without iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    while True:
        i = await queue.get()
        print(f'queue -> {i}')
        await asyncio.sleep(.3)
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    tasks = []
    for _ in range(1):
        task = asyncio.create_task(consumer(queue))
        tasks.append(task)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue))

    print('produced everything')
    await queue.join()
    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)
    print('consumed everything')

asyncio.run(main())

With iteration

import asyncio

async def producer(queue):
    for i in range(5):
        print(f'{i} -> queue')
        await queue.put(i)
        await asyncio.sleep(.1)

async def consumer(queue):
    async for i in queue:
        print(f'queue -> {i}')
        await asyncio.sleep(.3)

async def main():
    queue = asyncio.Queue()
    async with asyncio.TaskGroup() as tg1:
        tg1.create_task(consumer(queue))
        async with asyncio.TaskGroup() as tg2:
            tg2.create_task(producer(queue))

        queue.shutdown()
        print('produced everything')

    print('consumed everything')

asyncio.run(main())

Output

0 -> queue
queue -> 0
1 -> queue
2 -> queue
queue -> 1
3 -> queue
4 -> queue
produced everything
queue -> 2
queue -> 3
queue -> 4
consumed everything

📚 Documentation preview 📚: library/asyncio-queue.html

Co-authored-by: Guido van Rossum <guido@python.org>
@nineteendo nineteendo changed the title Add asyncio.Queue.__aiter__() gh-119154: Add asyncio.Queue.__aiter__() Jun 14, 2024
@nineteendo nineteendo marked this pull request as ready for review June 14, 2024 11:32
@nineteendo nineteendo requested a review from rhettinger as a code owner June 14, 2024 11:32
@nineteendo
Copy link
Contributor Author

cc @gvanrossum, @Zac-HD

@rhettinger rhettinger removed their request for review June 14, 2024 17:24
@gvanrossum
Copy link
Member

I hope sumeone will review this. Right now I am not available as a reviewer, sorry.

Copy link
Contributor

@Zac-HD Zac-HD left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two bits of idiom noted below, but overall this looks great to me - thanks, @nineteendo!

@nineteendo nineteendo requested a review from Zac-HD June 15, 2024 05:30
@nineteendo nineteendo requested a review from 1st1 as a code owner June 18, 2024 18:07
Co-authored-by: Kumar Aditya <kumaraditya@python.org>
Copy link
Contributor

@willingc willingc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nineteendo, Thanks for the PR. I left a comment on the doc and will take a closer look at the implementation after I hear back about the expected behavior.

@willingc
Copy link
Contributor

Hi @nineteendo @Zac-HD @kumaraditya303 @itamaro @ambv, @carljm,

I've noticed a recent trend that the ayncio PR titles and body text are often very terse without much context on what the change is and why.

To help reviewers and to give context to future contributors, let's raise the bar a bit on how we document asyncio PRs.

Write clear titles and descriptions for your pull requests so that reviewers can quickly understand what the pull request does. In the pull request body, include:

the purpose of the pull request
an overview of what changed

Two sentences (one with purpose: why; one with change: what) would be very helpful for reviewers for both individual and stacked task PRs. It would likely also help us turn around PRs in a more timely manner. Thanks!

@nineteendo nineteendo changed the title gh-119154: Add asyncio.Queue.__aiter__() gh-119154: Simplify consumers by making asyncio.Queue an asynchronous iterable Jun 20, 2024
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 20, 2024

I've adjusted the PR title and body text. I hope it's a bit clearer now. The example is based on the current one in the documentation.

@nineteendo nineteendo marked this pull request as draft June 23, 2024 17:20
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

I want to think some more about what would be the best approach to support this for both synchronous and asynchronous queues.

@nineteendo nineteendo marked this pull request as ready for review June 23, 2024 20:34
@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 23, 2024

I finished the alternative pull request: #120925. Decide if you prefer it over the current pull request.
I think an explicit method is better, but that's just my opinion.

@willingc
Copy link
Contributor

@nineteendo Please refrain from opening multiple pull requests at one time for the same issue. Instead, iterating on one PR toward a preferred solution. Here is the preferred approach:

  • Core devs and reviewers expect you to submit what you believe is the best solution as one PR.
  • Respond to feedback on the open PR and iterate on it.
  • If needed, add more tests to prove that the approach is sound.

Thanks!

@nineteendo
Copy link
Contributor Author

nineteendo commented Jun 24, 2024

Sorry for the trouble, the original issue technically didn't apply to syncronous queues (as per the title).

@nineteendo nineteendo changed the title gh-119154: Simplify consumers by making asyncio.Queue an asynchronous iterable gh-120947: Simplify consumers by making asyncio.Queue an asynchronous iterable Jun 24, 2024
@nineteendo nineteendo changed the title gh-120947: Simplify consumers by making asyncio.Queue an asynchronous iterable gh-119154: Simplify consumers by making asyncio.Queue an asynchronous iterable Jun 24, 2024
@nineteendo nineteendo marked this pull request as draft June 24, 2024 13:44
@rhettinger
Copy link
Contributor

Marking this as closed for the reasons listed in #120925

Thanks for the work on this. It was a plausible idea but queues are more versatile than the iterator protocol allows.

@rhettinger rhettinger closed this Sep 25, 2024
@nineteendo nineteendo deleted the add-asyncio.Queue.__aiter__ branch September 25, 2024 05:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants