-
-
Notifications
You must be signed in to change notification settings - Fork 32.8k
Fixed #35629 -- Added support for async database connections and cursors. #18408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
ed91e56
to
83099c9
Compare
3a3fc57
to
e6bbcd1
Compare
1b81214
to
b42cc45
Compare
b42cc45
to
86158fb
Compare
86158fb
to
3be837e
Compare
3be837e
to
288b3c9
Compare
288b3c9
to
df58a82
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sufficiently familiar with the internals of the ORM/database layer to comment on much of the internal logic here, but the general shape of this code and approach seems to make sense to me
(left a few questions, nothing terribly blocking)
ServerCursor implements the logic required to declare and scroll | ||
through named cursors. | ||
|
||
Mixing ClientCursorMixin in wouldn't be necessary if Cursor allowed to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps out of scope for this PR, but I don't think it's clear where this Cursor
comes from. I think it's defined in psycopg3
, but I'm not confident in that.
This should also be clarified in the ServerSideCursor
docstring above too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anything under Database.
comes from psycopg2
or psycopg3
.
In this specific case, ClientCursonMixin
and AsyncClientCursor
come from psycopg3
ServerSideCursor
Is our custom class that derives from postgres' ClientCursorMixin
and AsyncServerCursor
/ServerCursor
stop = time.monotonic() | ||
duration = stop - start | ||
if use_last_executed_query: | ||
sql = self.db.ops.last_executed_query(self.cursor, sql, params) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might this need to be async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've checked across all backends, and last_executed_query
is a non-blocking method that basically only does string interpolation / decoding of a string that's already stored on the DBAPI cursor.
259d10c
to
53fae35
Compare
efafc58
to
49141bb
Compare
class AsyncConnectionHandler: | ||
""" | ||
Context-aware class to store async connections, mapped by alias name. | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This solution perfectly covers existing requirements. But besides that, we have side effects. I'll try to explain below
example #1
async def get_authors(pattern):
# Create a new context to call concurrently
async with db.new_connections():
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
async def get_books(pattern):
# Create a new context to call concurrently
async with db.new_connections():
return [
book.title
async for book in Book.objects.filter(name__icontains=pattern)
]
async def my_view(request):
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
it will work perfectly.
example #2
async def my_view(request):
task_authors = get_authors("an")
task_books = get_books("di")
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
it still works but looks odd coz we don't have any reason to create a new connection for each sequential request. Anyway, we can do it.
example #3
async def get_authors(pattern):
# without new connection
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
async def get_books(pattern):
# create new connection and put it on stack
async with db.new_connections():
return [
book.title
async for book in Book.objects.filter(name__icontains=pattern)
]
# delete connection from the stack
# delete parent connection from the stack
await async_connections['default'].close()
async def my_view(request):
async with db.new_connections():
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
so I have full access to the parent connections (connection stack) and can do everything what I want with it.
I am not a big fan of db.new_connections
😂 so about my "very important" view
get rid of db.new_connections
and use decorator instead
def concurrent_async_db(func):
if not inspect.iscoroutinefunction(func):
raise TypeError("Function decorated with @concurrent_async_db must be an async function")
async def concurrent(*args, **kwargs):
# Clean up or isolate DB connection state before calling the real function
# or something like this async_connections.refresh()
async_connections['default'] = None
return await func(*args, **kwargs)
@wraps(func)
def wrapper(*args, **kwargs):
# we use asyncio.create_task coz without it, the concurrent connection doesn't have any sense
return asyncio.create_task(concurrent(*args, **kwargs))
return wrapper
@concurrent_async_db
async def get_authors(pattern):
return [
author.name
async for author in Authors.objects.filter(name__icontains=pattern)
]
adv
- we don't have cases as, for
example #2
coz we don't create a connection explicit - we don't have access to the parent connections
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this view:
async def my_view(request):
async with db.new_connections():
# Query authors and books concurrently
task_authors = asyncio.create_task(get_authors("an"))
task_books = asyncio.create_task(get_books("di"))
return render(
request,
"template.html",
{
"books": await task_books,
"authors": await task_authors,
},
)
I think the user should expect the two queries to happen sequentially. The code await
s for task_books
, then await
s again for task_authors
. That's why the keyword is called await
:) "Async" does not mean "in parallel".
We could also have a concurrent_async_db
, but I think it should be a later addition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, the current example works as expected. The problem is that with the existing requirement (I am talking about db.new_connections()), we generate a lot of problems (which I described above), and our end users have to keep in mind all of that to write code without side effects 😔
If our end goal is to give the ability to run queries in parallel, then we have other approaches that don't have the current problems
ps: I really really hate creating an explicit connection 😅
@@ -89,6 +92,8 @@ def _get_varchar_column(data): | |||
class DatabaseWrapper(BaseDatabaseWrapper): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it would be better to implement a separate class for async.
Advantages:
- No mixed logic between sync and async connections
- Prevents creating a sync connection for an async wrapper and vice versa
- Allows us to keep using method names like execute, fetchone, etc., without adding an 'a' prefix
Disadvantages:
- We're relying on DatabaseWrapper in many places, as seen here. We need to be careful with it
example how we can implement asynchronous connection handler Arfey@6f13cc8
class AsyncConnectionHandler(BaseAsyncConnectionHandler):
...
def create_connection(self, alias):
db = self.settings[alias]
backend = load_backend(db["ENGINE"])
if not hasattr(backend, 'AsyncDatabaseWrapper'):
raise self.exception_class(
f"The async connection '{alias}' doesn't exist."
)
return backend.AsyncDatabaseWrapper(db, alias)
AsyncDatabaseWrapper
- separate database wrapper only for asynchronous operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue that made me decide against separate classes is easy-of-use.
Consider that most users will be starting from an existing sync codebase, and they'd want to async just parts of it.
With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.
Basically, they won't be able to call async with db.new_connections()
. They'll have to call async with db.new_connections("my_alias")
every time.
I am a bit conflicted. On one hand, I can see this as a case of 'explicit better than implicit'. OTOH, One of the key features of the Django ORM is this kind of implicit connection management.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider that most users will be starting from an existing sync codebase, and they’d want to async just parts of it. With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.
You are right 😌 100% agree with you. Let’s consider the existing code base with async code
await User.objects.aget(id=user_id)
So, current user interacts with our “async” ORM over models. Inside model we can get alias + right connection without changes from the user side.
Current code
class QuerySet(AltersData):
def count(self):
if self._result_cache is not None:
return len(self._result_cache)
return self.query.get_count(using=self.db)
async def acount(self):
return await sync_to_async(self.count)()
self.db
is our alias
Inside our model's method, we need to do something like that
class RawQuery:
def _execute_query(self):
connection = connections[self.using]
...
async def _execute_query(self):
connection = async_connections[self.using]
...
Looks easy to me 🙂
With two separate classes, they'd have to configure a separate backend - no big deal.
I thought about it, and I think I found an acceptable solution.
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
# ...
},
}
Inside django.db.backends.postgresql We put 2 classes, DatabaseWrapper and AsyncDatabaseWrapper. Also, we have 2 connections, ConnectionHandler and AsyncConnectionHandler, for each of them, we return the appropriate wrapper
connections = ConnectionHandler()
async_connections = AsyncConnectionHandler()
connection = async_connections[DEFAULT_DB_ALIAS]
async with connection.cursor() as cursor:
await cursor.execute("""select 1""")
connection = connections[DEFAULT_DB_ALIAS]
with connection.cursor() as cursor:
cursor.execute("""select 1""")
It works (u can check it in my experimental branch). It follows the same style and works in the same way.
Does it make sense? 😊
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding correctly, we'd have separate classes, and create_connection
would pick the appropriate class from the module.
I don't have a strong opinion one way or the other. It seems just a different (possibly better) way to structure our code. For me it comes down to which one is easiest to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm understanding correctly, we'd have separate classes, and create_connection would pick the appropriate class from the module.
correct
For me it comes down to which one is easiest to maintain.
Are we just talking about Django's internals here, or also about the projects built on top of it? 😌 Because DatabaseWrapper
it's a public interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point. Which way would be easier for 3rd-party packages to implement?
Hey 👋 I’ve prepared a small POC where I tried to separate the database wrappers for the sync and async variants. The idea was to explore an alternative to the current approach, where both sync and async methods are duplicated inside a single class (e.g., using acommit alongside commit). While the current solution works, I feel it makes the code harder to follow. Splitting it into two separate implementations introduces some duplication, but keeps the interface cleaner and easier to understand. When you have a moment, please take a look and share your thoughts: 🔗 https://github.com/Arfey/django/pull/1/files Thanks 😌 |
I've thought about the code generation described in this article, and we already have a script that will work well for us. It's easy peasy. Just replace async and await with an empty string. But we have some consequences:
In our case in sync class we have a lot of code which we need to make migration, introspection and etc, and I've deleted it. also i dont understand how to deal with "not simple" async code like that, or async.gather That's why I've decided to prepare a version with full duplication (to avoid changing the existing sync code). As u can see, we don't have a lot of duplication if compared with the current MR. |
Hi @Arfey , I appreciate your review! that's a lot of feedback, so it's going to take me a while to get through it! |
e7727eb
to
126f15a
Compare
I’ve moved the discussion about the explicit database connection to the Django forum to keep the code review thread more focused. 😊 Here’s the link to the forum post. |
Co-authored-by: Mykhailo Havelia <Arfey17.mg@gmail.com>
126f15a
to
fd3053b
Compare
|
||
return self.conn | ||
|
||
async def __aexit__(self, exc_type, exc_value, traceback): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I’ve found an issue with an async context manager — everything works fine until an asyncio.CancelledError
is raised.
Here's a short example to reproduce it.
import asyncio
class AsyncResource:
async def __aenter__(self):
print("Enter context")
return self
async def __aexit__(self, exc_type, exc, tb):
print("Exiting context: start")
await asyncio.sleep(2)
print("Exiting context: end")
async def task_that_uses_resource():
async with AsyncResource():
await asyncio.sleep(1)
async def main():
task = asyncio.create_task(task_that_uses_resource())
# Let the task run a bit and then cancel it
await asyncio.sleep(1.5)
print("Cancelling the task...")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled!")
asyncio.run(main())
results
Enter context
Exiting context: start
Cancelling the task...
Task was cancelled!
It’s clear why we don’t see Exiting context: end
, but I’m not sure how to fix it quickly. This problem can lead to a leak of connections and incorrect nesting of connections due to pop_connection
.
I'm trying to do something like that
# use sync context manager instead
@contextmanager
def independent_connection(self):
active_connection = self.all(initialized_only=True)
try:
for conn in active_connection:
del self[conn.alias]
yield
finally:
# run async code in a separate coroutine without await
asyncio.create_task(self.close_all())
for conn in active_connection:
self[conn.alias] = conn
But it still doesn't cover every case. @fcurella, any thoughts on how we can to tackle this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if shielding the exit logic would fix it? something like:
from asyncio import shield
class new_connection:
...
async def __aexit__(self, exc_type, exc_value, traceback):
await shield(self._aexit(exc_type, exc_value, traceback))
async def _aexit(self, exc_type, exc_value, traceback):
autocommit = await self.conn.aget_autocommit()
if autocommit is False:
if exc_type is None and self.force_rollback is False:
await self.conn.acommit()
else:
await self.conn.arollback()
await self.conn.aclose()
await async_connections.pop_connection(self.using)
Could you write a test case to expose the bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely sure. Shielding seems to help with closing the connection, but I'm concerned it might not fix issues with broken nested ordering caused by pop_connection.
I'll try to write a unit test when I have some free time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import asyncio
from django.db import new_connection, async_connections, DEFAULT_DB_ALIAS
from django.test import TransactionTestCase, skipUnlessDBFeature
@skipUnlessDBFeature("supports_async")
class NestedConnectionTest(TransactionTestCase):
available_apps = ["async"]
async def test_nesting_error1(self):
async def aget_autocommit_new():
raise asyncio.CancelledError
async with new_connection() as conn1:
assert async_connections.count == 1
main_connection = async_connections.get_connection(DEFAULT_DB_ALIAS)
aget_autocommit_old = conn1.aget_autocommit
try:
async with new_connection() as conn2:
assert async_connections.count == 2
conn2.aget_autocommit = aget_autocommit_new
except:
pass
conn1.aget_autocommit = aget_autocommit_old
assert main_connection == async_connections.get_connection(DEFAULT_DB_ALIAS)
assert async_connections.count == 1
This test contains monkey_patch, but it's clearly demonstrating the root of the problems. Is it helpful for you?
======================================================================
FAIL: test_nesting_error1 (async.test_async_connections.NestedConnectionTest.test_nesting_error1)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/m.havelya/work/django/env/lib/python3.13/site-packages/asgiref/sync.py", line 325, in __call__
return call_result.result()
~~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/Users/m.havelya/work/django/env/lib/python3.13/site-packages/asgiref/sync.py", line 365, in main_wrap
result = await awaitable
^^^^^^^^^^^^^^^
File "/Users/m.havelya/work/django/tests/async/test_async_connections.py", line 31, in test_nesting_error1
assert main_connection == async_connections.get_connection(DEFAULT_DB_ALIAS)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, that's really helpful!
We could wrap the __exit__
in a try ... finally
block:
async def __aexit__(self, exc_type, exc_value, traceback):
try:
autocommit = await self.conn.aget_autocommit()
if autocommit is False:
if exc_type is None and self.force_rollback is False:
await self.conn.acommit()
else:
await self.conn.arollback()
finally:
await self.conn.aclose()
await async_connections.pop_connection(self.using)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can get this problem during aclose
call as well
When this is finalized and merged to master will it also be added to the subsequent Django 5.2.x release? |
Trac ticket number
ticket-35629
Branch description
This would cover the new new_connection context managers, and provide an async cursor that the user could use.
The goal of this first phase is to give users a low-level async cursor that they can use for raw SQL queries
Manual transaction management, such as
acommit
andarollback
is in scope, buttransaction.atomic
is out of scope and will be addressed in a future Pull Request.Checklist
main
branch.