-
Notifications
You must be signed in to change notification settings - Fork 557
Add async task background worker #4591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add async task background worker #4591
Conversation
❌ 17 Tests Failed:
View the top 3 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
1261319
to
2896602
Compare
7ada4b3
to
1a129f7
Compare
2896602
to
268ea1a
Compare
1a129f7
to
97c5e3d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really great work. I have some comments for improvement.
Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation. GH-4578
Add a new factory method instead of direct instatiation of the threaded background worker. This allows for easy extension to other types of workers, such as the upcoming task-based async worker. GH-4578
Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a synchronous blocking flush. GH-4578
Move the flush_async down to the concrete subclass to not break existing testing. This makes sense, as this will only really be needed by the async worker anyway and therefore is not shared logic. GH-4578
Coroutines have a return value, however the current function signature for the worker methods does not accomodate for this. Therefore, this signature was changed. GH-4578
1fbf85f
to
ef780f3
Compare
Add a new implementation of the worker interface, implementing the worker as an async task. This is to be used by the upcoming async transport. GH-4581
Refactor the flush method in the async worker to use the async_flush coroutine. GH-4581
…unctions Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited. GH-4581
…coroutines Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously, only callbacks with return Type None were accepted. GH-4581
Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic. GH-4581
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async. GH-4581
Add proper type annotation to worker task list to fix linting problems GH-4581
Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation. GH-4581
97c5e3d
to
b5eda0e
Compare
The flush method in the transport enqueues a sync callback for the worker, therefore the check needs to be here after all. GH-4581
The callbacks passed to the worker from the transport are all async now, so this is currently not needed. GH-4581
CI lint issue is in an unrelated file, not sure why it is popping up. |
…' into srothh/async-task-worker
self._queue.task_done() | ||
break | ||
# Firing tasks instead of awaiting them allows for concurrent requests | ||
task = asyncio.create_task(self._process_callback(callback)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is asyncio.create_task
while all the others use self._loop
directly. Is that intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_target is the function actually running in the task, whereas other parts where tasks get instantiated are called from the transport. This means that running it directly with asyncio will reference the correct loop that is executing target. I believe using self._loop here could introduce a race, as kill() resets the loop reference to None, which can happen before the task _submit reaches the TERMINATOR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(for this reason also the other functions introduce an is_alive() check)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice work, will merge this
one question I need to think about is:
- if we want multiple concurrent requests at the same time at all
- if we do, a way to maybe control the number of requests at the same time (but without adding another layer of queuing complexity)
7889074
into
srothh/transport-class-hierarchy
Add a new implementation of the transport background worker based on an async task. This worker mostly mirrors the same functionality as the thread-based worker, with the exception that it exposes a non-blocking async flush (which can be awaited from an async context). Furthermore, the worker itself is not thread-safe and should be called using run_coroutine_threadsafe or similar when called from another thread (this is fixed and handled by the transport). I have kept the fork check from the threaded worker, but I am not sure if it is necessary as forking in an async application would also break the event loop.
GH-4581