diff --git a/CHANGES.txt b/CHANGES.txt index 2f8df79c..614938cf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,13 @@ Changes for crate Unreleased ========== +- SQLAlchemy: Add compatibility adapter for SQLAlchemy's ``psycopg`` and ``asyncpg`` + dialects, introducing the ``crate+psycopg://``, ``crate+asyncpg://``, and + ``crate+urllib3://`` dialect identifiers. The asynchronous variant of ``psycopg`` + is also supported. +- SQLAlchemy: Add example demonstrating asynchronous streaming mode, using server-side + cursors + 2023/03/02 0.30.1 ================= diff --git a/examples/async_streaming.py b/examples/async_streaming.py new file mode 100644 index 00000000..1f1eb7d3 --- /dev/null +++ b/examples/async_streaming.py @@ -0,0 +1,172 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, this +time in asynchronous mode. + +Specific to the asynchronous mode of SQLAlchemy is the streaming of results: + +> The `AsyncConnection` also features a "streaming" API via the `AsyncConnection.stream()` +> method that returns an `AsyncResult` object. This result object uses a server-side cursor +> and provides an async/await API, such as an async iterator. +> +> -- https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + +Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python examples/async_streaming.py psycopg + + # Use PostgreSQL protocol, with `asyncpg` + python examples/async_streaming.py asyncpg + + # Use with both variants + python examples/async_streaming.py psycopg asyncpg + +Bugs +==== + +When using the `psycopg` driver, the program currently croaks like:: + + sqlalchemy.exc.InternalError: (psycopg.errors.InternalError_) Cannot find portal: c_10479c0a0_1 + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + +metadata = sa.MetaData() +table = sa.Table( + "t1", + metadata, + sa.Column("id", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("name", sa.String), +) + + +class AsynchronousTableStreamingExample: + """ + Demonstrate reading streamed results when using the CrateDB SQLAlchemy + dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. + + - https://docs.sqlalchemy.org/en/20/orm/extensions/asyncio.html#synopsis-core + - https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, echo=True) + + async def run(self): + """ + Run the whole recipe. + """ + await self.create_and_insert() + await self.read_buffered() + await self.read_streaming() + + async def create_and_insert(self): + """ + Create table schema, completely dropping it upfront, and insert a few records. + """ + # conn is an instance of AsyncConnection + async with self.engine.begin() as conn: + # to support SQLAlchemy DDL methods as well as legacy functions, the + # AsyncConnection.run_sync() awaitable method will pass a "sync" + # version of the AsyncConnection object to any synchronous method, + # where synchronous IO calls will be transparently translated for + # await. + await conn.run_sync(metadata.drop_all, checkfirst=True) + await conn.run_sync(metadata.create_all) + + # for normal statement execution, a traditional "await execute()" + # pattern is used. + await conn.execute( + table.insert(), + [{"id": 1, "name": "some name 1"}, {"id": 2, "name": "some name 2"}], + ) + + # CrateDB specifics to flush/synchronize the write operation. + await conn.execute(sa.text("REFRESH TABLE t1;")) + + async def read_buffered(self): + """ + Read data from the database, in buffered mode. + """ + async with self.engine.connect() as conn: + # the default result object is the + # sqlalchemy.engine.Result object + result = await conn.execute(table.select()) + + # the results are buffered so no await call is necessary + # for this case. + print(result.fetchall()) + + async def read_streaming(self): + """ + Read data from the database, in streaming mode. + """ + async with self.engine.connect() as conn: + + # for a streaming result that buffers only segments of the + # result at time, the AsyncConnection.stream() method is used. + # this returns a sqlalchemy.ext.asyncio.AsyncResult object. + async_result = await conn.stream(table.select()) + + # this object supports async iteration and awaitable + # versions of methods like .all(), fetchmany(), etc. + async for row in async_result: + print(row) + + +async def run_example(dsn: str): + example = AsynchronousTableStreamingExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + await example.run() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + elif driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + run_drivers(drivers) diff --git a/examples/async_table.py b/examples/async_table.py new file mode 100644 index 00000000..ea3ae89d --- /dev/null +++ b/examples/async_table.py @@ -0,0 +1,193 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API, this +time in asynchronous mode. + +Both the PostgreSQL drivers based on `psycopg` and `asyncpg` are exercised. +The corresponding SQLAlchemy dialect identifiers are:: + + # PostgreSQL protocol on port 5432, using `psycopg` + crate+psycopg://crate@localhost:5432/doc + + # PostgreSQL protocol on port 5432, using `asyncpg` + crate+asyncpg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use PostgreSQL protocol, with asynchronous support of `psycopg` + python examples/async_table.py psycopg + + # Use PostgreSQL protocol, with `asyncpg` + python examples/async_table.py asyncpg + + # Use with both variants + python examples/async_table.py psycopg asyncpg + +""" +import asyncio +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa +from sqlalchemy.ext.asyncio import create_async_engine + + +class AsynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect in asynchronous mode with the `psycopg` and `asyncpg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return create_async_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + async def conn_run_sync(self, func: t.Callable, *args, **kwargs): + """ + To support SQLAlchemy DDL methods as well as legacy functions, the + AsyncConnection.run_sync() awaitable method will pass a "sync" + version of the AsyncConnection object to any synchronous method, + where synchronous IO calls will be transparently translated for + await. + + https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/basic.html + """ + # `conn` is an instance of `AsyncConnection` + async with self.engine.begin() as conn: + return await conn.run_sync(func, *args, **kwargs) + + async def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + await self.create() + await self.insert(sync=True) + return await self.read() + + async def create(self): + """ + Create table schema, completely dropping it upfront. + """ + await self.conn_run_sync(self.table.drop, checkfirst=True) + await self.conn_run_sync(self.table.create) + + async def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + async with self.engine.begin() as conn: + stmt = self.table.insert().values(x=1, y=42) + await conn.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + await conn.execute(stmt) + if sync and self.dsn.startswith("crate"): + await conn.execute(sa.text("REFRESH TABLE testdrive;")) + + async def read(self): + """ + Read data from the database. + """ + async with self.engine.begin() as conn: + cursor = await conn.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + async def reflect(self): + """ + Reflect the table schema from the database. + """ + + # Debugging. + # self.trace() + + def reflect(session): + """ + A function written in "synchronous" style that will be invoked + within the asyncio event loop. + + The session object passed is a traditional orm.Session object with + synchronous interface. + + https://docs.sqlalchemy.org/en/20/_modules/examples/asyncio/greenlet_orm.html + """ + meta = sa.MetaData() + reflected_table = sa.Table("testdrive", meta, autoload_with=session) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + return await self.conn_run_sync(reflect) + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +async def run_example(dsn: str): + example = AsynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = await example.run() + print(result) + + # Reflect the table schema. + await example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + elif driver == "asyncpg": + dsn = "crate+asyncpg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + asyncio.run(run_example(dsn)) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/examples/sync_table.py b/examples/sync_table.py new file mode 100644 index 00000000..d048e702 --- /dev/null +++ b/examples/sync_table.py @@ -0,0 +1,165 @@ +""" +About +===== + +Example program to demonstrate how to connect to CrateDB using its SQLAlchemy +dialect, and exercise a few basic examples using the low-level table API. + +Both the HTTP driver based on `urllib3`, and the PostgreSQL driver based on +`psycopg` are exercised. The corresponding SQLAlchemy dialect identifiers are:: + + # CrateDB HTTP API on port 4200 + crate+urllib3://localhost:4200/doc + + # PostgreSQL protocol on port 5432 + crate+psycopg://crate@localhost:5432/doc + +Synopsis +======== +:: + + # Run CrateDB + docker run --rm -it --publish=4200:4200 --publish=5432:5432 crate + + # Use HTTP API + python examples/sync_table.py urllib3 + + # Use PostgreSQL protocol + python examples/sync_table.py psycopg + + # Use with both variants + python examples/sync_table.py urllib3 psycopg + +""" +import sys +import typing as t +from functools import lru_cache + +import sqlalchemy as sa + + +class SynchronousTableExample: + """ + Demonstrate the CrateDB SQLAlchemy dialect with the `urllib3` and `psycopg` drivers. + """ + + def __init__(self, dsn: str): + self.dsn = dsn + + @property + @lru_cache + def engine(self): + """ + Provide an SQLAlchemy engine object. + """ + return sa.create_engine(self.dsn, isolation_level="AUTOCOMMIT", echo=True) + + @property + @lru_cache + def table(self): + """ + Provide an SQLAlchemy table object. + """ + metadata = sa.MetaData() + return sa.Table( + "testdrive", + metadata, + # TODO: When omitting `autoincrement`, SA's DDL generator will use `SERIAL`. + # (psycopg.errors.InternalError_) Cannot find data type: serial + # This is probably one more thing to redirect to the CrateDialect. + sa.Column("x", sa.Integer, primary_key=True, autoincrement=False), + sa.Column("y", sa.Integer), + ) + + def run(self): + """ + Run the whole recipe, returning the result from the "read" step. + """ + self.create() + self.insert(sync=True) + return self.read() + + def create(self): + """ + Create table schema, completely dropping it upfront. + """ + self.table.drop(bind=self.engine, checkfirst=True) + self.table.create(bind=self.engine) + + def insert(self, sync: bool = False): + """ + Write data from the database, taking CrateDB-specific `REFRESH TABLE` into account. + """ + with self.engine.begin() as session: + stmt = self.table.insert().values(x=1, y=42) + session.execute(stmt) + stmt = self.table.insert().values(x=2, y=42) + session.execute(stmt) + if sync and self.dsn.startswith("crate"): + session.execute(sa.text("REFRESH TABLE testdrive;")) + + def read(self): + """ + Read data from the database. + """ + with self.engine.begin() as session: + cursor = session.execute(sa.text("SELECT * FROM testdrive;")) + return cursor.fetchall() + + def reflect(self): + """ + Reflect the table schema from the database. + """ + meta = sa.MetaData() + # Debugging. + # self.trace() + reflected_table = sa.Table("testdrive", meta, autoload_with=self.engine) + print("Table information:") + print(f"Table: {reflected_table}") + print(f"Columns: {reflected_table.columns}") + print(f"Constraints: {reflected_table.constraints}") + print(f"Primary key: {reflected_table.primary_key}") + + @staticmethod + def trace(): + """ + Trace execution flow through SQLAlchemy. + + pip install hunter + """ + from hunter import Q, trace + + constraint = Q(module_startswith="sqlalchemy") + trace(constraint) + + +def run_example(dsn: str): + example = SynchronousTableExample(dsn) + + # Run a basic conversation. + # It also includes a catalog inquiry at `table.drop(checkfirst=True)`. + result = example.run() + print(result) + + # Reflect the table schema. + # example.reflect() + + +def run_drivers(drivers: t.List[str]): + for driver in drivers: + if driver == "urllib3": + dsn = "crate+urllib3://localhost:4200/doc" + elif driver == "psycopg": + dsn = "crate+psycopg://crate@localhost:5432/doc" + else: + raise ValueError(f"Unknown driver: {driver}") + + run_example(dsn) + + +if __name__ == "__main__": + + drivers = sys.argv[1:] + if not drivers: + raise ValueError("Please select driver") + run_drivers(drivers) diff --git a/setup.py b/setup.py index 3d465324..63728df8 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,11 @@ def read(path): namespace_packages=['crate'], entry_points={ 'sqlalchemy.dialects': [ - 'crate = crate.client.sqlalchemy:CrateDialect' + 'crate = crate.client.sqlalchemy:CrateDialect', + 'crate.urllib3 = crate.client.sqlalchemy.dialect_more:dialect_urllib3', + 'crate.psycopg = crate.client.sqlalchemy.dialect_more:dialect_psycopg', + 'crate.psycopg_async = crate.client.sqlalchemy.dialect_more:dialect_psycopg_async', + 'crate.asyncpg = crate.client.sqlalchemy.dialect_more:dialect_asyncpg', ] }, install_requires=['urllib3>=1.9,<2'], @@ -63,6 +67,7 @@ def read(path): sqlalchemy=['sqlalchemy>=1.0,<2.1', 'geojson>=2.5.0,<4', 'backports.zoneinfo<1; python_version<"3.9"'], + postgresql=['sqlalchemy-postgresql-relaxed'], test=['tox>=3,<5', 'zope.testing>=4,<6', 'zope.testrunner>=5,<6', diff --git a/src/crate/client/sqlalchemy/dialect.py b/src/crate/client/sqlalchemy/dialect.py index 7d2ee159..b21204f0 100644 --- a/src/crate/client/sqlalchemy/dialect.py +++ b/src/crate/client/sqlalchemy/dialect.py @@ -21,6 +21,7 @@ import logging from datetime import datetime, date +from types import ModuleType from sqlalchemy import types as sqltypes from sqlalchemy.engine import default, reflection @@ -191,6 +192,12 @@ def initialize(self, connection): self.default_schema_name = \ self._get_default_schema_name(connection) + def set_isolation_level(self, dbapi_connection, level): + """ + For CrateDB, this is implemented as a noop. + """ + pass + def do_rollback(self, connection): # if any exception is raised by the dbapi, sqlalchemy by default # attempts to do a rollback crate doesn't support rollbacks. @@ -209,7 +216,21 @@ def connect(self, host=None, port=None, *args, **kwargs): use_ssl = asbool(kwargs.pop("ssl", False)) if use_ssl: servers = ["https://" + server for server in servers] - return self.dbapi.connect(servers=servers, **kwargs) + + is_module = isinstance(self.dbapi, ModuleType) + if is_module: + driver_name = self.dbapi.__name__ + else: + driver_name = self.dbapi.__class__.__name__ + if driver_name == "crate.client": + if "database" in kwargs: + del kwargs["database"] + return self.dbapi.connect(servers=servers, **kwargs) + elif driver_name in ["psycopg", "PsycopgAdaptDBAPI", "AsyncAdapt_asyncpg_dbapi"]: + return self.dbapi.connect(host=host, port=port, **kwargs) + else: + raise ValueError(f"Unknown driver variant: {driver_name}") + return self.dbapi.connect(**kwargs) def _get_default_schema_name(self, connection): @@ -244,11 +265,11 @@ def get_schema_names(self, connection, **kw): @reflection.cache def get_table_names(self, connection, schema=None, **kw): - cursor = connection.exec_driver_sql( + cursor = connection.exec_driver_sql(self._format_query( "SELECT table_name FROM information_schema.tables " "WHERE {0} = ? " "AND table_type = 'BASE TABLE' " - "ORDER BY table_name ASC, {0} ASC".format(self.schema_column), + "ORDER BY table_name ASC, {0} ASC").format(self.schema_column), (schema or self.default_schema_name, ) ) return [row[0] for row in cursor.fetchall()] @@ -270,7 +291,7 @@ def get_columns(self, connection, table_name, schema=None, **kw): "AND column_name !~ ?" \ .format(self.schema_column) cursor = connection.exec_driver_sql( - query, + self._format_query(query), (table_name, schema or self.default_schema_name, r"(.*)\[\'(.*)\'\]") # regex to filter subscript @@ -309,7 +330,7 @@ def result_fun(result): return set(rows[0] if rows else []) pk_result = engine.exec_driver_sql( - query, + self._format_query(query), (table_name, schema or self.default_schema_name) ) pks = result_fun(pk_result) @@ -343,6 +364,15 @@ def _create_column_info(self, row): def _resolve_type(self, type_): return TYPES_MAP.get(type_, sqltypes.UserDefinedType) + def _format_query(self, query): + """ + When using the PostgreSQL protocol with drivers `psycopg` or `asyncpg`, + the paramstyle is not `qmark`, but `pyformat`. + """ + if self.paramstyle == "pyformat": + query = query.replace("= ?", "= %s").replace("!~ ?", "!~ %s") + return query + class DateTrunc(functions.GenericFunction): name = "date_trunc" diff --git a/src/crate/client/sqlalchemy/dialect_more.py b/src/crate/client/sqlalchemy/dialect_more.py new file mode 100644 index 00000000..5029a76f --- /dev/null +++ b/src/crate/client/sqlalchemy/dialect_more.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. +from sqlalchemy import Inspector +from sqlalchemy_postgresql_relaxed.asyncpg import PGDialect_asyncpg_relaxed +from sqlalchemy_postgresql_relaxed.base import PGDialect_relaxed +from sqlalchemy_postgresql_relaxed.psycopg import ( + PGDialect_psycopg_relaxed, + PGDialectAsync_psycopg_relaxed, +) + +from crate.client.sqlalchemy import CrateDialect + + +class CrateDialectPostgresAdapter(PGDialect_relaxed, CrateDialect): + """ + Provide a CrateDialect on top of the relaxed PostgreSQL dialect. + """ + + inspector = Inspector + + # Need to manually override some methods because of polymorphic inheritance woes. + # TODO: Investigate if this can be solved using metaprogramming or other techniques. + has_schema = CrateDialect.has_schema + has_table = CrateDialect.has_table + get_schema_names = CrateDialect.get_schema_names + get_table_names = CrateDialect.get_table_names + get_view_names = CrateDialect.get_view_names + get_columns = CrateDialect.get_columns + get_pk_constraint = CrateDialect.get_pk_constraint + get_foreign_keys = CrateDialect.get_foreign_keys + get_indexes = CrateDialect.get_indexes + + get_multi_columns = CrateDialect.get_multi_columns + get_multi_pk_constraint = CrateDialect.get_multi_pk_constraint + get_multi_foreign_keys = CrateDialect.get_multi_foreign_keys + + # TODO: Those may want to go to CrateDialect instead? + def get_multi_indexes(self, *args, **kwargs): + return [] + + def get_multi_unique_constraints(self, *args, **kwargs): + return [] + + def get_multi_check_constraints(self, *args, **kwargs): + return [] + + def get_multi_table_comment(self, *args, **kwargs): + return [] + + +class CrateDialect_psycopg(PGDialect_psycopg_relaxed, CrateDialectPostgresAdapter): + driver = "psycopg" + + @classmethod + def get_async_dialect_cls(cls, url): + return CrateDialectAsync_psycopg + + @classmethod + def import_dbapi(cls): + import psycopg + + return psycopg + + +class CrateDialectAsync_psycopg(PGDialectAsync_psycopg_relaxed, CrateDialectPostgresAdapter): + driver = "psycopg_async" + is_async = True + + +class CrateDialect_asyncpg(PGDialect_asyncpg_relaxed, CrateDialectPostgresAdapter): + driver = "asyncpg" + + # TODO: asyncpg may have `paramstyle="numeric_dollar"`. Review this! + + # TODO: AttributeError: module 'asyncpg' has no attribute 'paramstyle' + """ + @classmethod + def import_dbapi(cls): + import asyncpg + + return asyncpg + """ + + +dialect_urllib3 = CrateDialect +dialect_psycopg = CrateDialect_psycopg +dialect_psycopg_async = CrateDialectAsync_psycopg +dialect_asyncpg = CrateDialect_asyncpg