Skip to content

Add async task background worker #4591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

srothh
Copy link
Member

@srothh srothh commented Jul 17, 2025

Add a new implementation of the transport background worker based on an async task. This worker mostly mirrors the same functionality as the thread-based worker, with the exception that it exposes a non-blocking async flush (which can be awaited from an async context). Furthermore, the worker itself is not thread-safe and should be called using run_coroutine_threadsafe or similar when called from another thread (this is fixed and handled by the transport). I have kept the fork check from the threaded worker, but I am not sure if it is necessary as forking in an async application would also break the event loop.

GH-4581

Copy link

codecov bot commented Jul 17, 2025

❌ 17 Tests Failed:

Tests completed Failed Passed Skipped
20742 17 20725 1098
View the top 3 failed test(s) by shortest run time
tests.integrations.redis.test_redis::test_redis_pipeline[True-True-expected_first_ten1]
Stack Traces | 0.083s run time
.../integrations/redis/test_redis.py:62: in test_redis_pipeline
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis::test_pii_data_redacted
Stack Traces | 0.084s run time
.../integrations/redis/test_redis.py:120: in test_pii_data_redacted
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'
tests.integrations.redis.test_redis_cache_module::test_cache_prefixes
Stack Traces | 0.084s run time
.../integrations/redis/test_redis_cache_module.py:209: in test_cache_prefixes
    connection = FakeStrictRedis()
.tox/py3.11-redis-v4/lib/python3.11.../site-packages/fakeredis/_connection.py:185: in __init__
    super().__init__(**kwds)
E   TypeError: Redis.__init__() got an unexpected keyword argument 'lib_name'

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 1261319 to 2896602 Compare July 21, 2025 09:58
@srothh srothh force-pushed the srothh/async-task-worker branch from 7ada4b3 to 1a129f7 Compare July 21, 2025 10:42
@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 2896602 to 268ea1a Compare July 23, 2025 14:02
@srothh srothh force-pushed the srothh/async-task-worker branch from 1a129f7 to 97c5e3d Compare July 23, 2025 14:04
@srothh srothh marked this pull request as ready for review July 24, 2025 07:49
@srothh srothh requested a review from a team as a code owner July 24, 2025 07:49
Copy link
Member

@antonpirker antonpirker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really great work. I have some comments for improvement.

srothh added 6 commits July 28, 2025 10:55
Add an abstract bass class for implementation of the background worker. This was done to provide a shared interface for the current
implementation of a threaded worker in the sync context as well as the upcoming async task-based worker implementation.

GH-4578
Add a new factory method instead of direct instatiation of the threaded background worker.
This allows for easy extension to other types of workers, such as the upcoming task-based async worker.

GH-4578
Add a new flush_async method to worker ABC. This is necessary because the async transport cannot use a
synchronous blocking flush.

GH-4578
Move the flush_async down to the concrete subclass to not break existing testing. This makes sense,
as this will only really be needed by the async worker anyway and therefore is not shared logic.

GH-4578
Coroutines have a return value, however the current function signature for the worker methods does not
accomodate for this. Therefore, this signature was changed.

GH-4578
@srothh srothh force-pushed the srothh/worker-class-hierarchy branch from 1fbf85f to ef780f3 Compare July 28, 2025 08:55
srothh added 10 commits July 30, 2025 11:41
Add a new implementation of the worker interface, implementing the worker as an async task. This is
to be used by the upcoming async transport.

GH-4581
Refactor the flush method in the async worker to use the async_flush coroutine.

GH-4581
…unctions

Add a check to see wheter callbacks are awaitable coroutines or functions, as coroutines need to be awaited.

GH-4581
…coroutines

Coroutines do not return None, therefore it is necessary to consider this in the callback parameter of the worker. Previously,
only callbacks with return Type None were accepted.

GH-4581
Enable concurrent callbacks on async task worker by firing them as a task rather than awaiting them. A done callback handles the necessary queue and exception logic.

GH-4581
Changed kill to also use the _TERMINATOR sentinel, so the queue is still drained to this point on kill instead of cancelled immediately. This should also fix potential race conditions with flush_async.

GH-4581
Add proper type annotation to worker task list to fix linting problems

GH-4581
Refactor worker implementation to simplify callback processing, fix pending calculation and improve queue initialisation.

GH-4581
@srothh srothh force-pushed the srothh/async-task-worker branch from 97c5e3d to b5eda0e Compare July 30, 2025 12:07
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

srothh added 2 commits July 30, 2025 15:01
The flush method in the transport enqueues a sync callback for the worker, therefore the check needs to be here after all.

GH-4581
The callbacks passed to the worker from the transport are all async now, so this is currently not needed.

GH-4581
@srothh
Copy link
Member Author

srothh commented Jul 31, 2025

CI lint issue is in an unrelated file, not sure why it is popping up.

Base automatically changed from srothh/worker-class-hierarchy to srothh/transport-class-hierarchy August 12, 2025 13:13
self._queue.task_done()
break
# Firing tasks instead of awaiting them allows for concurrent requests
task = asyncio.create_task(self._process_callback(callback))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is asyncio.create_task while all the others use self._loop directly. Is that intentional?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_target is the function actually running in the task, whereas other parts where tasks get instantiated are called from the transport. This means that running it directly with asyncio will reference the correct loop that is executing target. I believe using self._loop here could introduce a race, as kill() resets the loop reference to None, which can happen before the task _submit reaches the TERMINATOR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(for this reason also the other functions introduce an is_alive() check)

Copy link
Member

@sl0thentr0py sl0thentr0py left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice work, will merge this

one question I need to think about is:

  • if we want multiple concurrent requests at the same time at all
  • if we do, a way to maybe control the number of requests at the same time (but without adding another layer of queuing complexity)

@sl0thentr0py sl0thentr0py merged commit 7889074 into srothh/transport-class-hierarchy Aug 12, 2025
121 of 125 checks passed
@sl0thentr0py sl0thentr0py deleted the srothh/async-task-worker branch August 12, 2025 15:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants