Skip to content

Add asyncio.Queue.__aiter__ #119154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Zac-HD opened this issue May 18, 2024 · 25 comments
Closed

Add asyncio.Queue.__aiter__ #119154

Zac-HD opened this issue May 18, 2024 · 25 comments
Labels
stdlib Python modules in the Lib dir topic-asyncio type-feature A feature request or enhancement

Comments

@Zac-HD
Copy link
Contributor

Zac-HD commented May 18, 2024

Feature or enhancement

Proposal:

Over the last few years, Trio and AnyIO users have proven out several design patterns using channels as async iterables. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.

An asyncio.Queue is almost identical to a channel-pair, especially with the .shutdown() method added in Python 3.13. I therefore propose that we add an .__aiter__ method, to more support such design patterns without subclassing or a generator helper function, with an implementation as described in #119154 (comment)

Links to previous discussion of this feature:

python/peps#3782 (review) suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.

Limitations

Making Queue aiterable reaches API parity for single-producer, single-consumer patterns. In multi-producer and/or multi-consumer patterns, without a .clone() method it is the user's responsibility to shut down the queue when the last task is done. I do not propose to add .clone(), but we could include that link in the docs as an option for multi-producer patterns if desired.

Linked PRs

Decision

#120925 (comment) explains the decision not to add *.Queue.iter(), and #120491 (comment) that asyncio.Queue.__aiter__ was rejected for the same reasons. Instead, the docs will show how users can write a wrapper like this.

@Zac-HD Zac-HD added the type-feature A feature request or enhancement label May 18, 2024
@github-project-automation github-project-automation bot moved this to Todo in asyncio May 18, 2024
@graingert
Copy link
Contributor

graingert commented May 19, 2024

It should be:

    async def __aiter__(self):
        try:
            while True:
                yield await self.get()
        except asyncio.QueueShutDown:
            return

Or it might be even better to implement __anext__ and do

def __aiter__(self):
    return self

@gvanrossum
Copy link
Member

Probably should also have a task_done call.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented May 20, 2024

I've included those suggestions in the top comment, sticking with __aiter__ because I don't see a clean way to support task_done() from __anext__ - and it feels more reasonable to describe queues as aiterables than aiterators.

@gvanrossum
Copy link
Member

I suspect the real problem is that the scope of the try/except is too large, and should only go around the await queue.get() part? (And you need s/queue/self/.)

@gvanrossum
Copy link
Member

Maybe this?

async def __aiter__(self):
    while True:
        try:
            item = await self.get()
        except asyncio.QueueShutDown:
            return
        yield item
        self.task_done()

A philosophical question is, if yield item raises (i.e., the caller throws an exception into the generator), should task_done() be called or not?

Also, does it look like you're proposing a legit use case for async generators?

@Zac-HD
Copy link
Contributor Author

Zac-HD commented May 20, 2024

Also, does it look like you're proposing a legit use case for async generators?

Yes! Generators (sync or async) are a really elegant syntax for defining iterables, and the problems motivating PEP-789 only occur if you yield (and thus suspend execution) within certain context managers. If you don't have a context manager in the generator function, and you're careful about finalization timing for PEP-533 reasons, I think they're great.

@serhiy-storchaka
Copy link
Member

In general, you should always close an asynchronous generator. So the correct use of the proposed feature would be:

async with contextlib.aclosing(aiter(queue)) as it:
    async for item in it:
        # process item

instead of simple

async for item in queue:
    # process item

We should also consider to add a method or a global function which returns an iterator instead of making Queue an iterable. It will allow to emit a warning if the iterator was not closed.

It is not clear what to do with task_done() if the iteration was stopped (exception or break/return) before exhausting the iterator.

@graingert
Copy link
Contributor

Rather than using an async generator, you could use a class with a __anext__ then you don't need to worry about exceptions being thrown in, or the async generator not being closed

@gvanrossum
Copy link
Member

Like this:

class AsyncQueueIterator:
    def __init__(self, queue):
        self.queue = queue

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            item = await self.queue.get()
        except asyncio.QueueShutDown:
            raise StopAsyncIteration
        else:
            return item


class Queue:
    ...
    def __aiter__(self):
        return AsyncQueueIterator(self)

@serhiy-storchaka
Copy link
Member

iter() has a two-argument form which allows you to specify a callable and the stop value. There were many discussions about allowing to specify the stop exceptions. The same is applicable to aiter(). If it was implemented, you could simply write:

async for item in aiter(queue.get, stop_exc=asyncio.QueueShutDown):
    # process item

Of course, such feature could be used in many other cases, not only with a queue.

There is other meaning of "iterating a queue":

async for item in aiter(queue.get_nowait, stop_exc=(asyncio.QueueEmpty, asyncio.QueueShutDown)):
    # process item

With this idiom you have a choice. __aiter__ can only implement one of them.

@nineteendo
Copy link
Contributor

nineteendo commented Jun 14, 2024

I think think the first implementation is the most useful, otherwise you need to create all tasks up front:

consumed everything # incorrect
0 -> queue
1 -> queue
2 -> queue
3 -> queue
4 -> queue
produced everything

Also, it isn't even asynchronous:

class QueueIterator:
    def __init__(self, queue):
        self.queue = queue

    def __iter__(self):
        return self

    async def __next__(self):
        try:
            return self.queue.get_nowait()
        except (QueueEmpty, QueueShutDown):
            raise StopIteration

It's impossible to implement this asynchronously:

  File "/Users/wannes/Documents/GitHub/cpython/add-asyncio.Queue.__aiter__/Lib/asyncio/queues.py", line 43, in __anext__
    return await self.queue.get_nowait()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object int can't be used in 'await' expression

@serhiy-storchaka
Copy link
Member

Oh, right, get_nowait() is not asynchronous, so this would be a synchronous iterator. We can add asyncio.Queue.__iter__, but it would be very confusing, and do not translate to synchronous queues.

So I propose revive the old idea about the stop exception for iter() and aiter() (there may be already open issue for this, maybe with a PR).

If after this we still want to make asyncio.Queue iterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.

@nineteendo
Copy link
Contributor

nineteendo commented Jun 14, 2024

I see, so queue.Queue would be an iterable, and asyncio.Queue an asynchronous iterable.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Jun 18, 2024

If after this we still want to make asyncio.Queue iterable, it can be implemented much easier. But there are many other queue implementations in the stdlib. We should considered them and decide which of them to make iterable. Either make them all iterable (with the same semantic), or set a rule why some queues are iterable and others are not.

I think it's reasonable to make queue types (a)iterable independently of whether (a)iter supports customized stop-exceptions - if nothing else, it's a much smaller feature. I'm not taking a position on whether it's worth making sync queue classes iterable, but if you decide yes I did give a list of what I'd do for each in the top comment:

python/peps#3782 (review) suggested that queue.Queue could also be iterable. If we're extending this to synchronous classes I'd also include multiprocessing.Queue and multiprocessing.SimpleQueue. I'd omit multiprocessing.connection.Connection, due to the byte-level send/recv methods, and queue.SimpleQueue because without a .close() or .shutdown() method there's no clean way to shut down.

@nineteendo
Copy link
Contributor

nineteendo commented Jun 18, 2024

I have made AsyncQueueIterator private. Then we can simplify it later without needing to go trough the deprecation cycle.

@nineteendo
Copy link
Contributor

Could you elaborate on how this would work for multiprocessing.Queue and multiprocessing.SimpleQueue?

@willingc
Copy link
Contributor

I have made AsyncQueueIterator private. Then we can simplify it later without needing to go trough the deprecation cycle.

This change is being reviewed and discussed in https://github.com/python/cpython/pull/120491/files/0a8a72b59b42abc67c8084053bf3e4cf2259bbb7#r1645878198 to determine if this class is necessary.

@nineteendo

This comment was marked as outdated.

@vstinner
Copy link
Member

This issue has many people involved, many comments, I don't think that it should be closed.

@nineteendo
Copy link
Contributor

nineteendo commented Jun 24, 2024

I personally think the alternative solution is better as it provides all functionality and is consistent for both queues.
Additionally, it follows the naming convention of the other functions making it intuitive and discoverable by IDE suggestions.
But opinions vary, so if I can't convince you, we move further with just asyncio.Queue.__aiter__().

I also think many of the points rhettinger raised equally apply to asyncio.Queue.__aiter__():

  • With shutdown being new, it is better to keep it explicit.
  • Currently the get() is an obvious marker of where blocking occurs. Code will be harder to review with it hidden behind an abstraction.
  • Current the get provides non-blocking [...] options which become inaccessible with this.

Minor respellings typically don't benefit users. In this case, I think it makes them worse-off being hiding two essential elements that should be explicit (the get call and catching the exception).

I am clear that we don't have to do this. It is only combines a while-get with a try/except. There is no new capability being added. It is perfectly reasonable to wait to see how people use shutdown()

All the PR does is hide both a get() and a try/except [...] Hiding the two steps makes users worse-off. Besides adding overhead, it hides a blocking call which in my expericience is something that needs to be explicit. [...] And as noted above it, distances the user from [...] non-blocking behaviors which typically need special handling distinct from the case of a shutdown. I also don't see how this would work well with code that wants to join() a queue where a worker needs to call task_done(). In short, users that want to do anything other than one specific case are made worse-off by trying to use the for-loop variant. It won't even be obvious that they have options.

@willingc
Copy link
Contributor

willingc commented Jun 24, 2024

@nineteendo Please slow down and listen to the guidance being shared by core developers. When creating a new feature, opening a PR and being able to code the feature are only a small parts of the development process.

First and foremost, the maintainers of the module need to consider the impact of adding a new feature to:

  • the module itself
  • the other parts of the language that interact with the new feature
  • whether it breaks backward compatibility
  • whether it changes the API that other projects depend upon
  • how much cognitive load and complexity the new feature adds for users

To sum it up, new features are generally added slowly after considering the impacts.

Please also recognize that many core developers are volunteers who have many responsibilities. Being respectful of people's time is very important. Multiple requests, messages, and PRs will not move your PR forward faster.

@nineteendo
Copy link
Contributor

nineteendo commented Jun 24, 2024

Please slow down

I wanted to quickly provide an alternative implementation before the current implementation gets merged.
If I do not longer get a reponse, I will wait until the PR is 1 month old before asking again.

First and foremost, the maintainers of the module need to consider the impact of adding a new feature

That's precisely why new features first need to be discussed on Discouse, which Guido mentioned specifically I don't understand why there's an exception for this suggestion. There's no mention of __aiter__ on Discouse after Guido's comment.

Multiple requests, messages, and PRs will not move your PR forward faster.

It wasn't my intention to request a review again (I merged both PRs as Zac suggested to open at most 2)
Sorry for the chaos, I focused too much on the title of the issue being too narrow.

Take your time to evaluate, the PRs are drafts, so can't be merged.

@novoxd
Copy link

novoxd commented Feb 14, 2025

Hi folks. Is this actual thread about asyncio.Queue.__aiter__ thing? Cause I had seen 3 other issues and 2 PRs from @nineteendo. Each one mention each other and closed because of some comments. What gets a little messy.

Just wondering why it is so complicated to make this feature. It looks like an extension of asyncio.Queue class and will not anyhow break backward compatibility or interact with anything.

Anyway making queue async iterable will be extra useful.

@picnixz picnixz added the stdlib Python modules in the Lib dir label Feb 14, 2025
@nineteendo
Copy link
Contributor

Yes, this is the issue for the 3 PRs. The other 2 issues were closed as a duplicate of this one.

@Zac-HD
Copy link
Contributor Author

Zac-HD commented Mar 10, 2025

Closing this issue as feature-declined, as Raymond describes in #120925 (comment) (and earlier in #120503 (comment)).

@Zac-HD Zac-HD closed this as not planned Won't fix, can't repro, duplicate, stale Mar 10, 2025
@github-project-automation github-project-automation bot moved this from Todo to Done in asyncio Mar 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stdlib Python modules in the Lib dir topic-asyncio type-feature A feature request or enhancement
Projects
Status: Done
Development

No branches or pull requests

10 participants