-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
Add asyncio.Queue.__aiter__
#119154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
It should be: async def __aiter__(self):
try:
while True:
yield await self.get()
except asyncio.QueueShutDown:
return Or it might be even better to implement def __aiter__(self):
return self |
Probably should also have a task_done call. |
I've included those suggestions in the top comment, sticking with |
I suspect the real problem is that the scope of the try/except is too large, and should only go around the |
Maybe this? async def __aiter__(self):
while True:
try:
item = await self.get()
except asyncio.QueueShutDown:
return
yield item
self.task_done() A philosophical question is, if Also, does it look like you're proposing a legit use case for async generators? |
Yes! Generators (sync or async) are a really elegant syntax for defining iterables, and the problems motivating PEP-789 only occur if you |
In general, you should always close an asynchronous generator. So the correct use of the proposed feature would be: async with contextlib.aclosing(aiter(queue)) as it:
async for item in it:
# process item instead of simple async for item in queue:
# process item We should also consider to add a method or a global function which returns an iterator instead of making Queue an iterable. It will allow to emit a warning if the iterator was not closed. It is not clear what to do with |
Rather than using an async generator, you could use a class with a |
Like this: class AsyncQueueIterator:
def __init__(self, queue):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
try:
item = await self.queue.get()
except asyncio.QueueShutDown:
raise StopAsyncIteration
else:
return item
class Queue:
...
def __aiter__(self):
return AsyncQueueIterator(self) |
async for item in aiter(queue.get, stop_exc=asyncio.QueueShutDown):
# process item Of course, such feature could be used in many other cases, not only with a queue. There is other meaning of "iterating a queue": async for item in aiter(queue.get_nowait, stop_exc=(asyncio.QueueEmpty, asyncio.QueueShutDown)):
# process item With this idiom you have a choice. |
I think think the first implementation is the most useful, otherwise you need to create all tasks up front:
Also, it isn't even asynchronous: class QueueIterator:
def __init__(self, queue):
self.queue = queue
def __iter__(self):
return self
async def __next__(self):
try:
return self.queue.get_nowait()
except (QueueEmpty, QueueShutDown):
raise StopIteration It's impossible to implement this asynchronously: File "/Users/wannes/Documents/GitHub/cpython/add-asyncio.Queue.__aiter__/Lib/asyncio/queues.py", line 43, in __anext__
return await self.queue.get_nowait()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: object int can't be used in 'await' expression |
Oh, right, So I propose revive the old idea about the stop exception for If after this we still want to make |
I see, so |
I think it's reasonable to make queue types (a)iterable independently of whether (a)iter supports customized stop-exceptions - if nothing else, it's a much smaller feature. I'm not taking a position on whether it's worth making sync queue classes iterable, but if you decide yes I did give a list of what I'd do for each in the top comment:
|
I have made |
Could you elaborate on how this would work for |
This change is being reviewed and discussed in https://github.com/python/cpython/pull/120491/files/0a8a72b59b42abc67c8084053bf3e4cf2259bbb7#r1645878198 to determine if this class is necessary. |
This comment was marked as outdated.
This comment was marked as outdated.
This issue has many people involved, many comments, I don't think that it should be closed. |
I personally think the alternative solution is better as it provides all functionality and is consistent for both queues. I also think many of the points rhettinger raised equally apply to
|
@nineteendo Please slow down and listen to the guidance being shared by core developers. When creating a new feature, opening a PR and being able to code the feature are only a small parts of the development process. First and foremost, the maintainers of the module need to consider the impact of adding a new feature to:
To sum it up, new features are generally added slowly after considering the impacts. Please also recognize that many core developers are volunteers who have many responsibilities. Being respectful of people's time is very important. Multiple requests, messages, and PRs will not move your PR forward faster. |
I wanted to quickly provide an alternative implementation before the current implementation gets merged.
That's precisely why new features first need to be discussed on Discouse, which Guido mentioned specifically I don't understand why there's an exception for this suggestion. There's no mention of
It wasn't my intention to request a review again (I merged both PRs as Zac suggested to open at most 2) Take your time to evaluate, the PRs are drafts, so can't be merged. |
Hi folks. Is this actual thread about Just wondering why it is so complicated to make this feature. It looks like an extension of Anyway making queue async iterable will be extra useful. |
Yes, this is the issue for the 3 PRs. The other 2 issues were closed as a duplicate of this one. |
Closing this issue as feature-declined, as Raymond describes in #120925 (comment) (and earlier in #120503 (comment)). |
Uh oh!
There was an error while loading. Please reload this page.
Feature or enhancement
Proposal:
Over the last few years, Trio and AnyIO users have proven out several design patterns using channels as async iterables. For example, having a context manager yield an async iterable avoids the motivating problems of both PEP-533 and PEP-789.
An
asyncio.Queue
is almost identical to a channel-pair, especially with the.shutdown()
method added in Python 3.13. I therefore propose that we add an.__aiter__
method, to more support such design patterns without subclassing or a generator helper function, with an implementation as described in #119154 (comment)Links to previous discussion of this feature:
python/peps#3782 (review) suggested that
queue.Queue
could also be iterable. If we're extending this to synchronous classes I'd also includemultiprocessing.Queue
andmultiprocessing.SimpleQueue
. I'd omitmultiprocessing.connection.Connection
, due to the byte-level send/recv methods, andqueue.SimpleQueue
because without a.close()
or.shutdown()
method there's no clean way to shut down.Limitations
Making
Queue
aiterable reaches API parity for single-producer, single-consumer patterns. In multi-producer and/or multi-consumer patterns, without a.clone()
method it is the user's responsibility to shut down the queue when the last task is done. I do not propose to add.clone()
, but we could include that link in the docs as an option for multi-producer patterns if desired.Linked PRs
asyncio.Queue
an asynchronous iterable #120491*.Queue.iter()
and*.Queue.iter_nowait()
#120925queue.Queue
an iterable #120503Decision
#120925 (comment) explains the decision not to add
*.Queue.iter()
, and #120491 (comment) thatasyncio.Queue.__aiter__
was rejected for the same reasons. Instead, the docs will show how users can write a wrapper like this.The text was updated successfully, but these errors were encountered: