Skip to content

Fixed #35629 -- Added support for async database connections and cursors. #18408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

fcurella
Copy link
Contributor

@fcurella fcurella commented Jul 24, 2024

Trac ticket number

ticket-35629

Branch description

This would cover the new new_connection context managers, and provide an async cursor that the user could use.

The goal of this first phase is to give users a low-level async cursor that they can use for raw SQL queries

Manual transaction management, such as acommit and arollback is in scope, but transaction.atomic is out of scope and will be addressed in a future Pull Request.

Checklist

  • This PR targets the main branch.
  • The commit message is written in past tense, mentions the ticket number, and ends with a period.
  • I have checked the "Has patch" ticket flag in the Trac system.
  • I have added or updated relevant tests.
  • I have added or updated relevant docs, including release notes if applicable.
  • I have attached screenshots in both light and dark modes for any UI changes.

@fcurella fcurella changed the title Fixed #35629. Async DB Connection and Cursor Draft: Fixed #35629. Async DB Connection and Cursor Jul 24, 2024
@fcurella fcurella force-pushed the async-orm-cursor branch 6 times, most recently from 3a3fc57 to e6bbcd1 Compare July 25, 2024 20:03
@fcurella fcurella changed the title Draft: Fixed #35629. Async DB Connection and Cursor Fixed #35629. Async DB Connection and Cursor Jul 25, 2024
@fcurella fcurella force-pushed the async-orm-cursor branch 5 times, most recently from 1b81214 to b42cc45 Compare August 9, 2024 13:47
@carltongibson
Copy link
Member

@fcurella Thanks for pushing this! I'm going to have a potter on it, and I'll see if @felixxm has a cycle too.

First glance it looks very minimal. 👍

Copy link
Contributor

@bigfootjon bigfootjon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sufficiently familiar with the internals of the ORM/database layer to comment on much of the internal logic here, but the general shape of this code and approach seems to make sense to me

(left a few questions, nothing terribly blocking)

ServerCursor implements the logic required to declare and scroll
through named cursors.

Mixing ClientCursorMixin in wouldn't be necessary if Cursor allowed to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps out of scope for this PR, but I don't think it's clear where this Cursor comes from. I think it's defined in psycopg3, but I'm not confident in that.

This should also be clarified in the ServerSideCursor docstring above too.

Copy link
Contributor Author

@fcurella fcurella Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anything under Database. comes from psycopg2 or psycopg3.

In this specific case, ClientCursonMixin and AsyncClientCursor come from psycopg3

ServerSideCursor Is our custom class that derives from postgres' ClientCursorMixin and AsyncServerCursor/ServerCursor

stop = time.monotonic()
duration = stop - start
if use_last_executed_query:
sql = self.db.ops.last_executed_query(self.cursor, sql, params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might this need to be async?

Copy link
Contributor Author

@fcurella fcurella Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked across all backends, and last_executed_query is a non-blocking method that basically only does string interpolation / decoding of a string that's already stored on the DBAPI cursor.

@fcurella fcurella force-pushed the async-orm-cursor branch 2 times, most recently from 259d10c to 53fae35 Compare October 22, 2024 15:26
@fcurella fcurella force-pushed the async-orm-cursor branch 3 times, most recently from efafc58 to 49141bb Compare October 24, 2024 14:24
class AsyncConnectionHandler:
"""
Context-aware class to store async connections, mapped by alias name.
"""
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution perfectly covers existing requirements. But besides that, we have side effects. I'll try to explain below

example #1

async def get_authors(pattern):
    # Create a new context to call concurrently
    async with db.new_connections():
        return [
            author.name
            async for author in Authors.objects.filter(name__icontains=pattern)
        ]

async def get_books(pattern):
    # Create a new context to call concurrently
    async with db.new_connections():
        return [
            book.title
            async for book in Book.objects.filter(name__icontains=pattern)
        ]

async def my_view(request):
    # Query authors and books concurrently
    task_authors = asyncio.create_task(get_authors("an"))
    task_books = asyncio.create_task(get_books("di"))
    return render(
        request,
        "template.html",
        {
            "books": await task_books,
            "authors": await task_authors,
        },
    )

it will work perfectly.

example #2

async def my_view(request):
    task_authors = get_authors("an")
    task_books = get_books("di")
    return render(
        request,
        "template.html",
        {
            "books": await task_books,
            "authors": await task_authors,
        },
    )

it still works but looks odd coz we don't have any reason to create a new connection for each sequential request. Anyway, we can do it.

example #3

async def get_authors(pattern):
    # without new connection
    return [
        author.name
        async for author in Authors.objects.filter(name__icontains=pattern)
    ]

async def get_books(pattern):
    # create new connection and put it on stack
    async with db.new_connections():
        return [
            book.title
            async for book in Book.objects.filter(name__icontains=pattern)
        ]
        # delete connection from the stack

    # delete parent connection from the stack
    await async_connections['default'].close()

async def my_view(request):
    async with db.new_connections():
        # Query authors and books concurrently
        task_authors = asyncio.create_task(get_authors("an"))
        task_books = asyncio.create_task(get_books("di"))

        return render(
            request,
            "template.html",
            {
                "books": await task_books,
                "authors": await task_authors,
            },
        )

so I have full access to the parent connections (connection stack) and can do everything what I want with it.

I am not a big fan of db.new_connections 😂 so about my "very important" view

get rid of db.new_connections and use decorator instead

def concurrent_async_db(func):
    if not inspect.iscoroutinefunction(func):
        raise TypeError("Function decorated with @concurrent_async_db must be an async function")

    async def concurrent(*args, **kwargs):
        # Clean up or isolate DB connection state before calling the real function
        # or something like this async_connections.refresh()
        async_connections['default'] = None
        return await func(*args, **kwargs)

    @wraps(func)
    def wrapper(*args, **kwargs):
        # we use asyncio.create_task coz without it, the concurrent connection doesn't have any sense
        return asyncio.create_task(concurrent(*args, **kwargs))

    return wrapper

@concurrent_async_db
async def get_authors(pattern):
    return [
        author.name
        async for author in Authors.objects.filter(name__icontains=pattern)
    ]

adv

  • we don't have cases as, for example #2 coz we don't create a connection explicit
  • we don't have access to the parent connections

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this view:

async def my_view(request):
    async with db.new_connections():
        # Query authors and books concurrently
        task_authors = asyncio.create_task(get_authors("an"))
        task_books = asyncio.create_task(get_books("di"))

        return render(
            request,
            "template.html",
            {
                "books": await task_books,
                "authors": await task_authors,
            },
        )

I think the user should expect the two queries to happen sequentially. The code awaits for task_books, then awaits again for task_authors. That's why the keyword is called await :) "Async" does not mean "in parallel".

We could also have a concurrent_async_db, but I think it should be a later addition.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the current example works as expected. The problem is that with the existing requirement (I am talking about db.new_connections()), we generate a lot of problems (which I described above), and our end users have to keep in mind all of that to write code without side effects 😔

If our end goal is to give the ability to run queries in parallel, then we have other approaches that don't have the current problems

ps: I really really hate creating an explicit connection 😅

@@ -89,6 +92,8 @@ def _get_varchar_column(data):
class DatabaseWrapper(BaseDatabaseWrapper):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be better to implement a separate class for async.

Advantages:

  • No mixed logic between sync and async connections
  • Prevents creating a sync connection for an async wrapper and vice versa
  • Allows us to keep using method names like execute, fetchone, etc., without adding an 'a' prefix

Disadvantages:

  • We're relying on DatabaseWrapper in many places, as seen here. We need to be careful with it

example how we can implement asynchronous connection handler Arfey@6f13cc8

class AsyncConnectionHandler(BaseAsyncConnectionHandler):
  
    ...

    def create_connection(self, alias):
         db = self.settings[alias]
         backend = load_backend(db["ENGINE"])

         if not hasattr(backend, 'AsyncDatabaseWrapper'):
             raise self.exception_class(
                 f"The async connection '{alias}' doesn't exist."
             )

         return backend.AsyncDatabaseWrapper(db, alias)

AsyncDatabaseWrapper - separate database wrapper only for asynchronous operations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue that made me decide against separate classes is easy-of-use.

Consider that most users will be starting from an existing sync codebase, and they'd want to async just parts of it.

With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.

Basically, they won't be able to call async with db.new_connections(). They'll have to call async with db.new_connections("my_alias") every time.

I am a bit conflicted. On one hand, I can see this as a case of 'explicit better than implicit'. OTOH, One of the key features of the Django ORM is this kind of implicit connection management.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that most users will be starting from an existing sync codebase, and they’d want to async just parts of it. With two separate classes, they'd have to configure a separate backend - no big deal. But now the async features are on the non-defaul alias, so they'll have to pass the alias every time they need an async connnection.

You are right 😌 100% agree with you. Let’s consider the existing code base with async code

await User.objects.aget(id=user_id)

So, current user interacts with our “async” ORM over models. Inside model we can get alias + right connection without changes from the user side.

Current code

class QuerySet(AltersData):
    def count(self):
        if self._result_cache is not None:
            return len(self._result_cache)

        return self.query.get_count(using=self.db)

    async def acount(self):
        return await sync_to_async(self.count)()

self.db is our alias

Inside our model's method, we need to do something like that

class RawQuery:

    def _execute_query(self):
        connection = connections[self.using]
        ...

    async def _execute_query(self):
        connection = async_connections[self.using]
        ...

Looks easy to me 🙂

With two separate classes, they'd have to configure a separate backend - no big deal.

I thought about it, and I think I found an acceptable solution.

DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        # ...
    },
}

Inside django.db.backends.postgresql We put 2 classes, DatabaseWrapper and AsyncDatabaseWrapper. Also, we have 2 connections, ConnectionHandler and AsyncConnectionHandler, for each of them, we return the appropriate wrapper

connections = ConnectionHandler()
async_connections = AsyncConnectionHandler()
connection = async_connections[DEFAULT_DB_ALIAS]

async with connection.cursor() as cursor:
    await cursor.execute("""select 1""")
connection = connections[DEFAULT_DB_ALIAS]

with connection.cursor() as cursor:
    cursor.execute("""select 1""")

It works (u can check it in my experimental branch). It follows the same style and works in the same way.

Does it make sense? 😊

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, we'd have separate classes, and create_connection would pick the appropriate class from the module.

I don't have a strong opinion one way or the other. It seems just a different (possibly better) way to structure our code. For me it comes down to which one is easiest to maintain.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding correctly, we'd have separate classes, and create_connection would pick the appropriate class from the module.

correct

For me it comes down to which one is easiest to maintain.

Are we just talking about Django's internals here, or also about the projects built on top of it? 😌 Because DatabaseWrapper it's a public interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. Which way would be easier for 3rd-party packages to implement?

@Arfey
Copy link

Arfey commented Apr 22, 2025

@fcurella @carltongibson

Hey 👋

I’ve prepared a small POC where I tried to separate the database wrappers for the sync and async variants.

The idea was to explore an alternative to the current approach, where both sync and async methods are duplicated inside a single class (e.g., using acommit alongside commit). While the current solution works, I feel it makes the code harder to follow. Splitting it into two separate implementations introduces some duplication, but keeps the interface cleaner and easier to understand.

When you have a moment, please take a look and share your thoughts:

🔗 https://github.com/Arfey/django/pull/1/files

Thanks 😌

ps: @fcurella It can be interesting for u 😀

@fcurella
Copy link
Contributor Author

fcurella commented Apr 22, 2025

@Arfey I think more duplication is acceptable IF we write in a such a way that we can use codegen to maintain it. @rtpg can you look at @Arfey 's code and see if codegen could work with that?

@Arfey
Copy link

Arfey commented Apr 22, 2025

@Arfey I think more duplication is acceptable IF we write in a such a way that we can use codegen to maintain it. @rtpg can you look at @Arfey 's code and see if codegen could work with that?

I've thought about the code generation described in this article, and we already have a script that will work well for us. It's easy peasy. Just replace async and await with an empty string.

But we have some consequences:

  • We need to write async code first and generate sync code from it
  • We need to have a mirror implementation for sync and asynchronous versions

In our case in sync class we have a lot of code which we need to make migration, introspection and etc, and I've deleted it.

also i dont understand how to deal with "not simple" async code like that, or async.gather

That's why I've decided to prepare a version with full duplication (to avoid changing the existing sync code). As u can see, we don't have a lot of duplication if compared with the current MR.

@fcurella
Copy link
Contributor Author

fcurella commented May 5, 2025

Hi @Arfey ,

I appreciate your review! that's a lot of feedback, so it's going to take me a while to get through it!

@fcurella fcurella force-pushed the async-orm-cursor branch 6 times, most recently from e7727eb to 126f15a Compare May 5, 2025 19:02
@Arfey
Copy link

Arfey commented May 8, 2025

I’ve moved the discussion about the explicit database connection to the Django forum to keep the code review thread more focused. 😊

Here’s the link to the forum post.

@fcurella fcurella force-pushed the async-orm-cursor branch from 126f15a to fd3053b Compare May 23, 2025 14:10

return self.conn

async def __aexit__(self, exc_type, exc_value, traceback):
Copy link

@Arfey Arfey May 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I’ve found an issue with an async context manager — everything works fine until an asyncio.CancelledError is raised.

Here's a short example to reproduce it.

import asyncio

class AsyncResource:
    async def __aenter__(self):
        print("Enter context")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        print("Exiting context: start")
        await asyncio.sleep(2)
        print("Exiting context: end")


async def task_that_uses_resource():
    async with AsyncResource():
        await asyncio.sleep(1)


async def main():
    task = asyncio.create_task(task_that_uses_resource())

    # Let the task run a bit and then cancel it
    await asyncio.sleep(1.5)
    print("Cancelling the task...")
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled!")


asyncio.run(main())

results

Enter context
Exiting context: start
Cancelling the task...
Task was cancelled!

It’s clear why we don’t see Exiting context: end, but I’m not sure how to fix it quickly. This problem can lead to a leak of connections and incorrect nesting of connections due to pop_connection.

I'm trying to do something like that

# use sync context manager instead
@contextmanager
def independent_connection(self):
    active_connection = self.all(initialized_only=True)

    try:
        for conn in active_connection:
            del self[conn.alias]
        yield
    finally:
        # run async code in a separate coroutine without await
        asyncio.create_task(self.close_all())
        for conn in active_connection:
            self[conn.alias] = conn

But it still doesn't cover every case. @fcurella, any thoughts on how we can to tackle this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if shielding the exit logic would fix it? something like:

from asyncio import shield


class new_connection:

    ...

    async def __aexit__(self, exc_type, exc_value, traceback):
        await shield(self._aexit(exc_type, exc_value, traceback))

    async def _aexit(self, exc_type, exc_value, traceback):
        autocommit = await self.conn.aget_autocommit()
        if autocommit is False:
            if exc_type is None and self.force_rollback is False:
                await self.conn.acommit()
            else:
                await self.conn.arollback()
        await self.conn.aclose()

        await async_connections.pop_connection(self.using)

Could you write a test case to expose the bug?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure. Shielding seems to help with closing the connection, but I'm concerned it might not fix issues with broken nested ordering caused by pop_connection.

I'll try to write a unit test when I have some free time.

Copy link

@Arfey Arfey Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import asyncio

from django.db import new_connection, async_connections, DEFAULT_DB_ALIAS
from django.test import TransactionTestCase, skipUnlessDBFeature



@skipUnlessDBFeature("supports_async")
class NestedConnectionTest(TransactionTestCase):

    available_apps = ["async"]

    async def test_nesting_error1(self):
        async def aget_autocommit_new():
            raise asyncio.CancelledError

        async with new_connection() as conn1:
            assert async_connections.count == 1
            main_connection = async_connections.get_connection(DEFAULT_DB_ALIAS)

            aget_autocommit_old = conn1.aget_autocommit
            try:
                async with new_connection() as conn2:
                    assert async_connections.count == 2
                    conn2.aget_autocommit = aget_autocommit_new
            except:
                pass

            conn1.aget_autocommit = aget_autocommit_old

            assert main_connection == async_connections.get_connection(DEFAULT_DB_ALIAS)
            assert async_connections.count == 1

This test contains monkey_patch, but it's clearly demonstrating the root of the problems. Is it helpful for you?

======================================================================
FAIL: test_nesting_error1 (async.test_async_connections.NestedConnectionTest.test_nesting_error1)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/m.havelya/work/django/env/lib/python3.13/site-packages/asgiref/sync.py", line 325, in __call__
    return call_result.result()
           ~~~~~~~~~~~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ~~~~~~~~~~~~~~~~~^^
  File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/m.havelya/work/django/env/lib/python3.13/site-packages/asgiref/sync.py", line 365, in main_wrap
    result = await awaitable
             ^^^^^^^^^^^^^^^
  File "/Users/m.havelya/work/django/tests/async/test_async_connections.py", line 31, in test_nesting_error1
    assert main_connection == async_connections.get_connection(DEFAULT_DB_ALIAS)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, that's really helpful!

We could wrap the __exit__ in a try ... finally block:

    async def __aexit__(self, exc_type, exc_value, traceback):
        try:
            autocommit = await self.conn.aget_autocommit()
            if autocommit is False:
                if exc_type is None and self.force_rollback is False:
                    await self.conn.acommit()
                else:
                    await self.conn.arollback()
        finally:
            await self.conn.aclose()
            await async_connections.pop_connection(self.using)

Copy link

@Arfey Arfey Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get this problem during aclose call as well

@robd003
Copy link

robd003 commented May 31, 2025

When this is finalized and merged to master will it also be added to the subsequent Django 5.2.x release?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants