From 7be41e2baba513644998f238e8c06c74dafed989 Mon Sep 17 00:00:00 2001 From: Matthias Fetzer Date: Mon, 15 May 2023 20:15:39 +0200 Subject: [PATCH 01/18] Allow to turn off column store --- src/crate/client/sqlalchemy/compiler.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/crate/client/sqlalchemy/compiler.py b/src/crate/client/sqlalchemy/compiler.py index 7e6dad7d..69493abc 100644 --- a/src/crate/client/sqlalchemy/compiler.py +++ b/src/crate/client/sqlalchemy/compiler.py @@ -128,6 +128,9 @@ def get_column_specification(self, column, **kwargs): colspec += " INDEX OFF" + if column.dialect_options['crate'].get('columnstore') is False: + colspec += " STORAGE WITH (columnstore = false)" + return colspec def visit_computed_column(self, generated): From c4cedf3eea49c56ba26daaabfb53176cb9011e7c Mon Sep 17 00:00:00 2001 From: Matthias Fetzer Date: Mon, 29 May 2023 11:08:26 +0200 Subject: [PATCH 02/18] Columnstore: Adding tests and error handling for non TEXT columns --- src/crate/client/sqlalchemy/compiler.py | 6 +++ .../sqlalchemy/tests/create_table_test.py | 37 ++++++++++++++++++- 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/crate/client/sqlalchemy/compiler.py b/src/crate/client/sqlalchemy/compiler.py index 69493abc..efa88d17 100644 --- a/src/crate/client/sqlalchemy/compiler.py +++ b/src/crate/client/sqlalchemy/compiler.py @@ -25,6 +25,7 @@ import sqlalchemy as sa from sqlalchemy.dialects.postgresql.base import PGCompiler from sqlalchemy.sql import compiler +from sqlalchemy.types import String from .types import MutableDict, _Craty, Geopoint, Geoshape from .sa_version import SA_VERSION, SA_1_4 @@ -129,6 +130,11 @@ def get_column_specification(self, column, **kwargs): colspec += " INDEX OFF" if column.dialect_options['crate'].get('columnstore') is False: + if not isinstance(column.type, (String, )): + raise sa.exc.CompileError( + "Controlling the columnstore is only allowed for STRING columns" + ) + colspec += " STORAGE WITH (columnstore = false)" return colspec diff --git a/src/crate/client/sqlalchemy/tests/create_table_test.py b/src/crate/client/sqlalchemy/tests/create_table_test.py index 7eca2628..91f54bca 100644 --- a/src/crate/client/sqlalchemy/tests/create_table_test.py +++ b/src/crate/client/sqlalchemy/tests/create_table_test.py @@ -20,6 +20,7 @@ # software solely pursuant to the terms of the relevant commercial agreement. import sqlalchemy as sa + try: from sqlalchemy.orm import declarative_base except ImportError: @@ -31,7 +32,6 @@ from unittest import TestCase from unittest.mock import patch, MagicMock - fake_cursor = MagicMock(name='fake_cursor') FakeCursor = MagicMock(name='FakeCursor', spec=Cursor) FakeCursor.return_value = fake_cursor @@ -77,6 +77,7 @@ class DummyTable(self.Base): __tablename__ = 'dummy' pk = sa.Column(sa.String, primary_key=True) obj_col = sa.Column(Object) + self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE dummy (\n\tpk STRING NOT NULL, \n\tobj_col OBJECT, ' @@ -91,6 +92,7 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String) + self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -105,6 +107,7 @@ class DummyTable(self.Base): __tablename__ = 't' ts = sa.Column(sa.BigInteger, primary_key=True) p = sa.Column(sa.BigInteger, sa.Computed("date_trunc('day', ts)")) + self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -119,6 +122,7 @@ class DummyTable(self.Base): __tablename__ = 't' ts = sa.Column(sa.BigInteger, primary_key=True) p = sa.Column(sa.BigInteger, sa.Computed("date_trunc('day', ts)", persisted=False)) + with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) @@ -131,6 +135,7 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String) + self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -166,6 +171,7 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String, primary_key=True) + self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -207,6 +213,7 @@ def test_column_pk_nullable(self): class DummyTable(self.Base): __tablename__ = 't' pk = sa.Column(sa.String, primary_key=True, nullable=True) + with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) @@ -230,5 +237,33 @@ class DummyTable(self.Base): __tablename__ = 't' pk = sa.Column(sa.String, primary_key=True) a = sa.Column(Geopoint, crate_index=False) + + with self.assertRaises(sa.exc.CompileError): + self.Base.metadata.create_all(bind=self.engine) + + def test_text_column_without_columnstore(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + a = sa.Column(sa.String, crate_columnstore=False) + b = sa.Column(sa.String, crate_columnstore=True) + c = sa.Column(sa.String) + + self.Base.metadata.create_all(bind=self.engine) + + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'a STRING STORAGE WITH (columnstore = false), \n\t' + 'b STRING, \n\t' + 'c STRING, \n\t' + 'PRIMARY KEY (pk)\n)\n\n'), ()) + + def test_non_text_column_without_columnstore(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + a = sa.Column(sa.Integer, crate_columnstore=False) + with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) From 6990dc49f6b44163c6fb230f5266d0098b761c79 Mon Sep 17 00:00:00 2001 From: Matthias Fetzer Date: Mon, 29 May 2023 11:12:45 +0200 Subject: [PATCH 03/18] Columnstore: Adding docs for controlling the columnstore --- docs/sqlalchemy.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/sqlalchemy.rst b/docs/sqlalchemy.rst index 00bf5d00..b16f2abf 100644 --- a/docs/sqlalchemy.rst +++ b/docs/sqlalchemy.rst @@ -205,6 +205,7 @@ system `: ... more_details = sa.Column(types.ObjectArray) ... name_ft = sa.Column(sa.String) ... quote_ft = sa.Column(sa.String) + ... even_more_details = sa.Column(sa.String, crate_columnstore=False) ... ... __mapper_args__ = { ... 'exclude_properties': ['name_ft', 'quote_ft'] @@ -220,6 +221,7 @@ In this example, we: - Use standard SQLAlchemy types for the ``id``, ``name``, and ``quote`` columns - Use ``nullable=False`` to define a ``NOT NULL`` constraint - Disable indexing of the ``name`` column using ``crate_index=False`` +- Disable the columnstore of the ``even_more_details`` column using ``crate_columnstore=False`` - Define a computed column ``name_normalized`` (based on ``name``) that translates into a generated column - Use the `Object`_ extension type for the ``details`` column From c74f757eb2a402e13d91ca440fd38b4500c67cc0 Mon Sep 17 00:00:00 2001 From: Matthias Fetzer Date: Mon, 29 May 2023 11:38:21 +0200 Subject: [PATCH 04/18] Columnstore: Revert (auto) reformatting --- src/crate/client/sqlalchemy/tests/create_table_test.py | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/crate/client/sqlalchemy/tests/create_table_test.py b/src/crate/client/sqlalchemy/tests/create_table_test.py index 91f54bca..f876655d 100644 --- a/src/crate/client/sqlalchemy/tests/create_table_test.py +++ b/src/crate/client/sqlalchemy/tests/create_table_test.py @@ -20,7 +20,6 @@ # software solely pursuant to the terms of the relevant commercial agreement. import sqlalchemy as sa - try: from sqlalchemy.orm import declarative_base except ImportError: @@ -32,6 +31,7 @@ from unittest import TestCase from unittest.mock import patch, MagicMock + fake_cursor = MagicMock(name='fake_cursor') FakeCursor = MagicMock(name='FakeCursor', spec=Cursor) FakeCursor.return_value = fake_cursor @@ -77,7 +77,6 @@ class DummyTable(self.Base): __tablename__ = 'dummy' pk = sa.Column(sa.String, primary_key=True) obj_col = sa.Column(Object) - self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE dummy (\n\tpk STRING NOT NULL, \n\tobj_col OBJECT, ' @@ -92,7 +91,6 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String) - self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -107,7 +105,6 @@ class DummyTable(self.Base): __tablename__ = 't' ts = sa.Column(sa.BigInteger, primary_key=True) p = sa.Column(sa.BigInteger, sa.Computed("date_trunc('day', ts)")) - self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -122,7 +119,6 @@ class DummyTable(self.Base): __tablename__ = 't' ts = sa.Column(sa.BigInteger, primary_key=True) p = sa.Column(sa.BigInteger, sa.Computed("date_trunc('day', ts)", persisted=False)) - with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) @@ -135,7 +131,6 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String) - self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -171,7 +166,6 @@ class DummyTable(self.Base): } pk = sa.Column(sa.String, primary_key=True) p = sa.Column(sa.String, primary_key=True) - self.Base.metadata.create_all(bind=self.engine) fake_cursor.execute.assert_called_with( ('\nCREATE TABLE t (\n\t' @@ -213,7 +207,6 @@ def test_column_pk_nullable(self): class DummyTable(self.Base): __tablename__ = 't' pk = sa.Column(sa.String, primary_key=True, nullable=True) - with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) @@ -237,7 +230,6 @@ class DummyTable(self.Base): __tablename__ = 't' pk = sa.Column(sa.String, primary_key=True) a = sa.Column(Geopoint, crate_index=False) - with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) From 7b86801062437465e8740dc5e298a0d18856ac1d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 8 Jun 2023 20:10:22 +0200 Subject: [PATCH 05/18] Columnstore: Add changelog item --- CHANGES.txt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.txt b/CHANGES.txt index 141b0667..0a7196f9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,9 @@ Changes for crate Unreleased ========== +- SQLAlchemy DDL: Allow turning off column store using ``crate_columnstore=False``. + Thanks, @fetzerms. + 2023/04/18 0.31.1 ================= From 4c62ecf4671a3b7b515f63dcb24a24c6e7895f05 Mon Sep 17 00:00:00 2001 From: Jan Likar Date: Fri, 26 May 2023 23:38:10 +0200 Subject: [PATCH 06/18] Implement server_default for SA columns CrateDB's SQLAlchemy dialect now handles the `server_default` when generating table DDL. Fix #454. --- CHANGES.txt | 1 + docs/sqlalchemy.rst | 4 +- src/crate/client/sqlalchemy/compiler.py | 5 +- .../sqlalchemy/tests/create_table_test.py | 52 +++++++++++++++++++ 4 files changed, 60 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0a7196f9..f8d0f54a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ Unreleased SQLAlchemy 2.0 by adding the new ``insert_returning`` and ``update_returning`` flags in the CrateDB dialect. +- SQLAlchemy DDL: Allow setting ``server_default`` on columns to enable server-generated defaults. 2023/03/30 0.31.0 ================= diff --git a/docs/sqlalchemy.rst b/docs/sqlalchemy.rst index b16f2abf..2c1a7471 100644 --- a/docs/sqlalchemy.rst +++ b/docs/sqlalchemy.rst @@ -206,6 +206,7 @@ system `: ... name_ft = sa.Column(sa.String) ... quote_ft = sa.Column(sa.String) ... even_more_details = sa.Column(sa.String, crate_columnstore=False) + ... created_at = sa.Column(sa.DateTime, server_default=sa.func.now()) ... ... __mapper_args__ = { ... 'exclude_properties': ['name_ft', 'quote_ft'] @@ -221,13 +222,14 @@ In this example, we: - Use standard SQLAlchemy types for the ``id``, ``name``, and ``quote`` columns - Use ``nullable=False`` to define a ``NOT NULL`` constraint - Disable indexing of the ``name`` column using ``crate_index=False`` -- Disable the columnstore of the ``even_more_details`` column using ``crate_columnstore=False`` - Define a computed column ``name_normalized`` (based on ``name``) that translates into a generated column - Use the `Object`_ extension type for the ``details`` column - Use the `ObjectArray`_ extension type for the ``more_details`` column - Set up the ``name_ft`` and ``quote_ft`` fulltext indexes, but exclude them from the mapping (so SQLAlchemy doesn't try to update them as if they were columns) +- Disable the columnstore of the ``even_more_details`` column using ``crate_columnstore=False`` +- Add a ``created_at`` column whose default value is set by CrateDB's ``now()`` function. .. TIP:: diff --git a/src/crate/client/sqlalchemy/compiler.py b/src/crate/client/sqlalchemy/compiler.py index efa88d17..3ac6fa57 100644 --- a/src/crate/client/sqlalchemy/compiler.py +++ b/src/crate/client/sqlalchemy/compiler.py @@ -108,7 +108,10 @@ class CrateDDLCompiler(compiler.DDLCompiler): def get_column_specification(self, column, **kwargs): colspec = self.preparer.format_column(column) + " " + \ self.dialect.type_compiler.process(column.type) - # TODO: once supported add default here + + default = self.get_column_default_string(column) + if default is not None: + colspec += " DEFAULT " + default if column.computed is not None: colspec += " " + self.process(column.computed) diff --git a/src/crate/client/sqlalchemy/tests/create_table_test.py b/src/crate/client/sqlalchemy/tests/create_table_test.py index f876655d..b7fb9b87 100644 --- a/src/crate/client/sqlalchemy/tests/create_table_test.py +++ b/src/crate/client/sqlalchemy/tests/create_table_test.py @@ -259,3 +259,55 @@ class DummyTable(self.Base): with self.assertRaises(sa.exc.CompileError): self.Base.metadata.create_all(bind=self.engine) + + def test_column_server_default_text_func(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + a = sa.Column(sa.DateTime, server_default=sa.text("now()")) + + self.Base.metadata.create_all(bind=self.engine) + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'a TIMESTAMP DEFAULT now(), \n\t' + 'PRIMARY KEY (pk)\n)\n\n'), ()) + + def test_column_server_default_string(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + a = sa.Column(sa.String, server_default="Zaphod") + + self.Base.metadata.create_all(bind=self.engine) + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'a STRING DEFAULT \'Zaphod\', \n\t' + 'PRIMARY KEY (pk)\n)\n\n'), ()) + + def test_column_server_default_func(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + a = sa.Column(sa.DateTime, server_default=sa.func.now()) + + self.Base.metadata.create_all(bind=self.engine) + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'a TIMESTAMP DEFAULT now(), \n\t' + 'PRIMARY KEY (pk)\n)\n\n'), ()) + + def test_column_server_default_text_constant(self): + class DummyTable(self.Base): + __tablename__ = 't' + pk = sa.Column(sa.String, primary_key=True) + answer = sa.Column(sa.Integer, server_default=sa.text("42")) + + self.Base.metadata.create_all(bind=self.engine) + fake_cursor.execute.assert_called_with( + ('\nCREATE TABLE t (\n\t' + 'pk STRING NOT NULL, \n\t' + 'answer INT DEFAULT 42, \n\t' + 'PRIMARY KEY (pk)\n)\n\n'), ()) From cac7e2b5905741d973bd49ef4fb48a932a2a0250 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 26 Jun 2023 10:12:44 +0200 Subject: [PATCH 07/18] Fix CHANGES.txt --- CHANGES.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index f8d0f54a..8e63b92c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,9 @@ Unreleased - SQLAlchemy DDL: Allow turning off column store using ``crate_columnstore=False``. Thanks, @fetzerms. +- SQLAlchemy DDL: Allow setting ``server_default`` on columns to enable + server-generated defaults. Thanks, @JanLikar. + 2023/04/18 0.31.1 ================= @@ -16,7 +19,6 @@ Unreleased SQLAlchemy 2.0 by adding the new ``insert_returning`` and ``update_returning`` flags in the CrateDB dialect. -- SQLAlchemy DDL: Allow setting ``server_default`` on columns to enable server-generated defaults. 2023/03/30 0.31.0 ================= From 6235bccacf1babb51cbf23db9fcdba24f9bb1ed5 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Mon, 26 Jun 2023 10:20:39 +0200 Subject: [PATCH 08/18] Allow handling timezone-aware datetime values when inserting or updating --- CHANGES.txt | 2 ++ docs/by-example/client.rst | 18 ++++++++++++++++++ docs/data-types.rst | 16 +++++++++++++++- src/crate/client/http.py | 10 +++++++--- src/crate/client/test_http.py | 15 ++++++++++++++- src/crate/client/tests.py | 2 ++ 6 files changed, 58 insertions(+), 5 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8e63b92c..cf5463bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,8 @@ Unreleased - SQLAlchemy DDL: Allow setting ``server_default`` on columns to enable server-generated defaults. Thanks, @JanLikar. +- Allow handling "aware" datetime values with time zone info when inserting or updating. + 2023/04/18 0.31.1 ================= diff --git a/docs/by-example/client.rst b/docs/by-example/client.rst index c9046d68..e32e4bbc 100644 --- a/docs/by-example/client.rst +++ b/docs/by-example/client.rst @@ -139,6 +139,24 @@ Refresh locations: >>> cursor.execute("REFRESH TABLE locations") +Updating Data +============= + +Both when inserting or updating data, values for ``TIMESTAMP`` columns can be obtained +in different formats. Both literal strings and datetime objects are supported. + + >>> import datetime as dt + >>> timestamp_full = "2023-06-26T09:24:00.123+02:00" + >>> timestamp_date = "2023-06-26" + >>> datetime_aware = dt.datetime.fromisoformat("2023-06-26T09:24:00.123+02:00") + >>> datetime_naive = dt.datetime.fromisoformat("2023-06-26T09:24:00.123") + >>> datetime_date = dt.date.fromisoformat("2023-06-26") + >>> cursor.execute("UPDATE locations SET date=? WHERE name='Cloverleaf'", (timestamp_full, )) + >>> cursor.execute("UPDATE locations SET date=? WHERE name='Cloverleaf'", (timestamp_date, )) + >>> cursor.execute("UPDATE locations SET date=? WHERE name='Cloverleaf'", (datetime_aware, )) + >>> cursor.execute("UPDATE locations SET date=? WHERE name='Cloverleaf'", (datetime_naive, )) + >>> cursor.execute("UPDATE locations SET date=? WHERE name='Cloverleaf'", (datetime_date, )) + Selecting Data ============== diff --git a/docs/data-types.rst b/docs/data-types.rst index d6a34e3b..1f1a10d3 100644 --- a/docs/data-types.rst +++ b/docs/data-types.rst @@ -94,9 +94,20 @@ __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#c .. NOTE:: - The type that ``date`` and ``datetime`` objects are mapped depends on the + The type that ``date`` and ``datetime`` objects are mapped to, depends on the CrateDB column type. +.. NOTE:: + + Values of ``TIMESTAMP`` columns will always be stored using a ``LONG`` type, + representing the `Unix time`_ (epoch) timestamp, i.e. number of seconds which + have passed since 00:00:00 UTC on Thursday, 1 January 1970. + + This means, when inserting or updating records using timezone-aware Python + ``datetime`` objects, timezone information will not be preserved. If you + need to store it, you will need to use a separate column. + + .. _data-types-sqlalchemy: SQLAlchemy @@ -156,3 +167,6 @@ __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#o __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#array __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#geo-point __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#geo-shape + + +.. _Unix time: https://en.wikipedia.org/wiki/Unix_time diff --git a/src/crate/client/http.py b/src/crate/client/http.py index e932f732..d4522612 100644 --- a/src/crate/client/http.py +++ b/src/crate/client/http.py @@ -33,7 +33,7 @@ from urllib.parse import urlparse from base64 import b64encode from time import time -from datetime import datetime, date +from datetime import datetime, date, timezone from decimal import Decimal from urllib3 import connection_from_url from urllib3.connection import HTTPConnection @@ -82,13 +82,17 @@ def super_len(o): class CrateJsonEncoder(json.JSONEncoder): - epoch = datetime(1970, 1, 1) + epoch_aware = datetime(1970, 1, 1, tzinfo=timezone.utc) + epoch_naive = datetime(1970, 1, 1) def default(self, o): if isinstance(o, Decimal): return str(o) if isinstance(o, datetime): - delta = o - self.epoch + if o.tzinfo is not None: + delta = o - self.epoch_aware + else: + delta = o - self.epoch_naive return int(delta.microseconds / 1000.0 + (delta.seconds + delta.days * 24 * 3600) * 1000.0) if isinstance(o, date): diff --git a/src/crate/client/test_http.py b/src/crate/client/test_http.py index ee32778b..0f7afa35 100644 --- a/src/crate/client/test_http.py +++ b/src/crate/client/test_http.py @@ -40,7 +40,7 @@ from urllib.parse import urlparse, parse_qs from setuptools.ssl_support import find_ca_bundle -from .http import Client, _get_socket_opts, _remove_certs_for_non_https +from .http import Client, CrateJsonEncoder, _get_socket_opts, _remove_certs_for_non_https from .exceptions import ConnectionError, ProgrammingError @@ -626,3 +626,16 @@ def test_username(self): self.assertEqual(TestingHTTPServer.SHARED['usernameFromXUser'], 'testDBUser') self.assertEqual(TestingHTTPServer.SHARED['username'], 'testDBUser') self.assertEqual(TestingHTTPServer.SHARED['password'], 'test:password') + + +class TestCrateJsonEncoder(TestCase): + + def test_naive_datetime(self): + data = dt.datetime.fromisoformat("2023-06-26T09:24:00.123") + result = json.dumps(data, cls=CrateJsonEncoder) + self.assertEqual(result, "1687771440123") + + def test_aware_datetime(self): + data = dt.datetime.fromisoformat("2023-06-26T09:24:00.123+02:00") + result = json.dumps(data, cls=CrateJsonEncoder) + self.assertEqual(result, "1687764240123") diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index 7bf1487d..c2ea3813 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -51,6 +51,7 @@ RetryOnTimeoutServerTest, RequestsCaBundleTest, TestUsernameSentAsHeader, + TestCrateJsonEncoder, TestDefaultSchemaHeader, ) from .sqlalchemy.tests import test_suite as sqlalchemy_test_suite @@ -341,6 +342,7 @@ def test_suite(): suite.addTest(unittest.makeSuite(RetryOnTimeoutServerTest)) suite.addTest(unittest.makeSuite(RequestsCaBundleTest)) suite.addTest(unittest.makeSuite(TestUsernameSentAsHeader)) + suite.addTest(unittest.makeSuite(TestCrateJsonEncoder)) suite.addTest(unittest.makeSuite(TestDefaultSchemaHeader)) suite.addTest(sqlalchemy_test_suite()) suite.addTest(doctest.DocTestSuite('crate.client.connection')) From 4e20913364cf0bdfb738477ed82a9a8e7f5d2b32 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 28 Jun 2023 15:40:49 +0200 Subject: [PATCH 09/18] Update documentation about obtaining timezone-aware timestamps MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Mathias Fußenegger --- CHANGES.txt | 2 +- docs/by-example/client.rst | 5 +++-- docs/data-types.rst | 11 ++++++----- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cf5463bf..04889d1b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,7 +11,7 @@ Unreleased - SQLAlchemy DDL: Allow setting ``server_default`` on columns to enable server-generated defaults. Thanks, @JanLikar. -- Allow handling "aware" datetime values with time zone info when inserting or updating. +- Allow handling datetime values tagged with time zone info when inserting or updating. 2023/04/18 0.31.1 diff --git a/docs/by-example/client.rst b/docs/by-example/client.rst index e32e4bbc..e053d73f 100644 --- a/docs/by-example/client.rst +++ b/docs/by-example/client.rst @@ -142,8 +142,9 @@ Refresh locations: Updating Data ============= -Both when inserting or updating data, values for ``TIMESTAMP`` columns can be obtained -in different formats. Both literal strings and datetime objects are supported. +Values for ``TIMESTAMP`` columns can be obtained as a string literal, ``date``, +or ``datetime`` object. If it contains timezone information, it is converted to +UTC, and the timezone information is discarded. >>> import datetime as dt >>> timestamp_full = "2023-06-26T09:24:00.123+02:00" diff --git a/docs/data-types.rst b/docs/data-types.rst index 1f1a10d3..acad570c 100644 --- a/docs/data-types.rst +++ b/docs/data-types.rst @@ -99,13 +99,14 @@ __ https://crate.io/docs/crate/reference/en/latest/general/ddl/data-types.html#c .. NOTE:: - Values of ``TIMESTAMP`` columns will always be stored using a ``LONG`` type, - representing the `Unix time`_ (epoch) timestamp, i.e. number of seconds which - have passed since 00:00:00 UTC on Thursday, 1 January 1970. + When using ``date`` or ``datetime`` objects with ``timezone`` information, + the value is implicitly converted to a `Unix time`_ (epoch) timestamp, i.e. + the number of seconds which have passed since 00:00:00 UTC on + Thursday, 1 January 1970. This means, when inserting or updating records using timezone-aware Python - ``datetime`` objects, timezone information will not be preserved. If you - need to store it, you will need to use a separate column. + ``date`` or ``datetime`` objects, timezone information will not be + preserved. If you need to store it, you will need to use a separate column. .. _data-types-sqlalchemy: From 640a2db4fbf7d4c4d423e4e9a14ec06cf014281b Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 4 Jul 2023 10:10:39 +0200 Subject: [PATCH 10/18] SQLAlchemy: Fix SQL statement caching for CrateDB's `OBJECT` type The SQLAlchemy implementation of CrateDB's `OBJECT` type offers indexed access to the instance's content in form of a dictionary. Thus, it must not use `cache_ok = True` on its implementation, i.e. this part of the compiled SQL clause must not be cached. Tests: Add integration test cases verifying SA's SQL statement caching Specifically, the special types `OBJECT` and `ARRAY` are of concern here. --- CHANGES.txt | 2 + src/crate/client/sqlalchemy/tests/__init__.py | 9 +- .../client/sqlalchemy/tests/query_caching.py | 117 ++++++++++++++++++ src/crate/client/sqlalchemy/types.py | 2 +- src/crate/client/tests.py | 6 +- 5 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 src/crate/client/sqlalchemy/tests/query_caching.py diff --git a/CHANGES.txt b/CHANGES.txt index 04889d1b..8b41400c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,8 @@ Unreleased - Allow handling datetime values tagged with time zone info when inserting or updating. +- SQLAlchemy: Fix SQL statement caching for CrateDB's ``OBJECT`` type. + 2023/04/18 0.31.1 ================= diff --git a/src/crate/client/sqlalchemy/tests/__init__.py b/src/crate/client/sqlalchemy/tests/__init__.py index acca5db0..3c032ebb 100644 --- a/src/crate/client/sqlalchemy/tests/__init__.py +++ b/src/crate/client/sqlalchemy/tests/__init__.py @@ -23,9 +23,10 @@ from .dialect_test import SqlAlchemyDialectTest from .function_test import SqlAlchemyFunctionTest from .warnings_test import SqlAlchemyWarningsTest +from .query_caching import SqlAlchemyQueryCompilationCaching -def test_suite(): +def test_suite_unit(): tests = TestSuite() tests.addTest(makeSuite(SqlAlchemyConnectionTest)) tests.addTest(makeSuite(SqlAlchemyDictTypeTest)) @@ -42,3 +43,9 @@ def test_suite(): tests.addTest(makeSuite(SqlAlchemyArrayTypeTest)) tests.addTest(makeSuite(SqlAlchemyWarningsTest)) return tests + + +def test_suite_integration(): + tests = TestSuite() + tests.addTest(makeSuite(SqlAlchemyQueryCompilationCaching)) + return tests diff --git a/src/crate/client/sqlalchemy/tests/query_caching.py b/src/crate/client/sqlalchemy/tests/query_caching.py new file mode 100644 index 00000000..fb4bdec3 --- /dev/null +++ b/src/crate/client/sqlalchemy/tests/query_caching.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. + +from __future__ import absolute_import +from unittest import TestCase, skipIf + +import sqlalchemy as sa +from sqlalchemy.orm import Session +from sqlalchemy.sql.operators import eq + +from crate.client.sqlalchemy import SA_VERSION, SA_1_4 +from crate.testing.settings import crate_host + +try: + from sqlalchemy.orm import declarative_base +except ImportError: + from sqlalchemy.ext.declarative import declarative_base + +from crate.client.sqlalchemy.types import Object, ObjectArray + + +class SqlAlchemyQueryCompilationCaching(TestCase): + + def setUp(self): + self.engine = sa.create_engine(f"crate://{crate_host}") + self.metadata = sa.MetaData(schema="testdrive") + self.session = Session(bind=self.engine) + self.Character = self.setup_entity() + + def setup_entity(self): + """ + Define ORM entity. + """ + Base = declarative_base(metadata=self.metadata) + + class Character(Base): + __tablename__ = 'characters' + name = sa.Column(sa.String, primary_key=True) + age = sa.Column(sa.Integer) + data = sa.Column(Object) + data_list = sa.Column(ObjectArray) + + return Character + + def setup_data(self): + """ + Insert two records into the `characters` table. + """ + self.metadata.drop_all(self.engine) + self.metadata.create_all(self.engine) + + Character = self.Character + char1 = Character(name='Trillian', data={'x': 1}, data_list=[{'foo': 1, 'bar': 10}]) + char2 = Character(name='Slartibartfast', data={'y': 2}, data_list=[{'bar': 2}]) + self.session.add(char1) + self.session.add(char2) + self.session.commit() + self.session.execute(sa.text("REFRESH TABLE testdrive.characters;")) + + @skipIf(SA_VERSION < SA_1_4, "On SA13, the 'ResultProxy' object has no attribute 'scalar_one'") + def test_object_multiple_select(self): + """ + The SQLAlchemy implementation of CrateDB's `OBJECT` type offers indexed + access to the instance's content in form of a dictionary. Thus, it must + not use `cache_ok = True` on its implementation, i.e. this part of the + compiled SQL clause must not be cached. + + This test verifies that two subsequent `SELECT` statements are translated + well, and don't trip on incorrect SQL compiled statement caching. + """ + self.setup_data() + Character = self.Character + + selectable = sa.select(Character).where(Character.data['x'] == 1) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"x": 1}, result) + + selectable = sa.select(Character).where(Character.data['y'] == 2) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"y": 2}, result) + + @skipIf(SA_VERSION < SA_1_4, "On SA13, the 'ResultProxy' object has no attribute 'scalar_one'") + def test_objectarray_multiple_select(self): + """ + The SQLAlchemy implementation of CrateDB's `ARRAY` type in form of the + `ObjectArray`, does *not* offer indexed access to the instance's content. + Thus, using `cache_ok = True` on that type should be sane, and not mess + up SQLAlchemy's SQL compiled statement caching. + """ + self.setup_data() + Character = self.Character + + selectable = sa.select(Character).where(Character.data_list['foo'].any(1, operator=eq)) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"x": 1}, result) + + selectable = sa.select(Character).where(Character.data_list['bar'].any(2, operator=eq)) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"y": 2}, result) diff --git a/src/crate/client/sqlalchemy/types.py b/src/crate/client/sqlalchemy/types.py index 1a3d7a06..587858ac 100644 --- a/src/crate/client/sqlalchemy/types.py +++ b/src/crate/client/sqlalchemy/types.py @@ -132,7 +132,7 @@ def __eq__(self, other): class _Craty(sqltypes.UserDefinedType): - cache_ok = True + cache_ok = False class Comparator(sqltypes.TypeEngine.Comparator): diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index c2ea3813..953988ab 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -54,7 +54,8 @@ TestCrateJsonEncoder, TestDefaultSchemaHeader, ) -from .sqlalchemy.tests import test_suite as sqlalchemy_test_suite +from .sqlalchemy.tests import test_suite_unit as sqlalchemy_test_suite_unit +from .sqlalchemy.tests import test_suite_integration as sqlalchemy_test_suite_integration log = logging.getLogger('crate.testing.layer') ch = logging.StreamHandler() @@ -344,7 +345,7 @@ def test_suite(): suite.addTest(unittest.makeSuite(TestUsernameSentAsHeader)) suite.addTest(unittest.makeSuite(TestCrateJsonEncoder)) suite.addTest(unittest.makeSuite(TestDefaultSchemaHeader)) - suite.addTest(sqlalchemy_test_suite()) + suite.addTest(sqlalchemy_test_suite_unit()) suite.addTest(doctest.DocTestSuite('crate.client.connection')) suite.addTest(doctest.DocTestSuite('crate.client.http')) @@ -394,6 +395,7 @@ def test_suite(): encoding='utf-8' ) s.layer = ensure_cratedb_layer() + s.addTest(sqlalchemy_test_suite_integration()) suite.addTest(s) return suite From ca52b12a82da374014dfd08cda97b09692b02ff4 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 5 Jul 2023 11:38:33 +0200 Subject: [PATCH 11/18] CI: Run on all pull requests, also on stacked ones --- .github/workflows/tests.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b09c1bfc..fd4fa6e2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -3,8 +3,7 @@ name: Tests on: push: branches: [ master ] - pull_request: - branches: [ master ] + pull_request: ~ workflow_dispatch: concurrency: From ce15b62ea33e1c01adcd84330bea5d11c5214b09 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 5 Jul 2023 11:34:01 +0200 Subject: [PATCH 12/18] SQLAlchemy: Use JSON type adapter for implementing CrateDB's `OBJECT` SQLAlchemy's `sqltypes.JSON` provides a facade for vendor-specific JSON types. Since it supports JSON SQL operations, it only works on backends that have an actual JSON type, which are currently PostgreSQL, MySQL, SQLite, and Microsoft SQL Server. This patch starts leveraging the same infrastructure, thus bringing corresponding interfaces to the CrateDB dialect. The major difference is that it will not actually do any JSON marshalling, but propagate corresponding data structures 1:1, because within CrateDB's SQL, `OBJECT`s do not need to be serialized into JSON strings before transfer. --- CHANGES.txt | 2 ++ src/crate/client/sqlalchemy/compiler.py | 17 +++++++-- src/crate/client/sqlalchemy/dialect.py | 15 +++++--- .../client/sqlalchemy/tests/compiler_test.py | 4 +-- .../client/sqlalchemy/tests/dict_test.py | 8 ++--- .../client/sqlalchemy/tests/match_test.py | 4 +-- .../client/sqlalchemy/tests/query_caching.py | 28 ++++++++++++++- .../client/sqlalchemy/tests/warnings_test.py | 35 +++++++++++++++++-- src/crate/client/sqlalchemy/types.py | 30 ++++++++++------ 9 files changed, 114 insertions(+), 29 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 8b41400c..0bbb7465 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -15,6 +15,8 @@ Unreleased - SQLAlchemy: Fix SQL statement caching for CrateDB's ``OBJECT`` type. +- SQLAlchemy: Refactor ``OBJECT`` type to use SQLAlchemy's JSON type infrastructure. + 2023/04/18 0.31.1 ================= diff --git a/src/crate/client/sqlalchemy/compiler.py b/src/crate/client/sqlalchemy/compiler.py index 3ac6fa57..3965c9e1 100644 --- a/src/crate/client/sqlalchemy/compiler.py +++ b/src/crate/client/sqlalchemy/compiler.py @@ -26,7 +26,7 @@ from sqlalchemy.dialects.postgresql.base import PGCompiler from sqlalchemy.sql import compiler from sqlalchemy.types import String -from .types import MutableDict, _Craty, Geopoint, Geoshape +from .types import MutableDict, ObjectTypeImpl, Geopoint, Geoshape from .sa_version import SA_VERSION, SA_1_4 @@ -41,7 +41,7 @@ def rewrite_update(clauseelement, multiparams, params): "col['x'] = ?, col['y'] = ?", (1, 2) - by using the `Craty` (`MutableDict`) type. + by using the `ObjectType` (`MutableDict`) type. The update statement is only rewritten if an item of the MutableDict was changed. """ @@ -124,7 +124,7 @@ def get_column_specification(self, column, **kwargs): ) if column.dialect_options['crate'].get('index') is False: - if isinstance(column.type, (Geopoint, Geoshape, _Craty)): + if isinstance(column.type, (Geopoint, Geoshape, ObjectTypeImpl)): raise sa.exc.CompileError( "Disabling indexing is not supported for column " "types OBJECT, GEO_POINT, and GEO_SHAPE" @@ -217,6 +217,9 @@ def visit_ARRAY(self, type_, **kw): "CrateDB doesn't support multidimensional arrays") return 'ARRAY({0})'.format(self.process(type_.item_type)) + def visit_OBJECT(self, type_, **kw): + return "OBJECT" + class CrateCompiler(compiler.SQLCompiler): @@ -226,6 +229,14 @@ def visit_getitem_binary(self, binary, operator, **kw): binary.right.value ) + def visit_json_getitem_op_binary( + self, binary, operator, _cast_applied=False, **kw + ): + return "{0}['{1}']".format( + self.process(binary.left, **kw), + binary.right.value + ) + def visit_any(self, element, **kw): return "%s%sANY (%s)" % ( self.process(element.left, **kw), diff --git a/src/crate/client/sqlalchemy/dialect.py b/src/crate/client/sqlalchemy/dialect.py index 5737f994..e992d41a 100644 --- a/src/crate/client/sqlalchemy/dialect.py +++ b/src/crate/client/sqlalchemy/dialect.py @@ -183,10 +183,17 @@ class CrateDialect(default.DefaultDialect): insert_returning = True update_returning = True - def __init__(self, *args, **kwargs): - super(CrateDialect, self).__init__(*args, **kwargs) - # currently our sql parser doesn't support unquoted column names that - # start with _. Adding it here causes sqlalchemy to quote such columns + def __init__(self, **kwargs): + default.DefaultDialect.__init__(self, **kwargs) + + # CrateDB does not need `OBJECT` types to be serialized as JSON. + # Corresponding data is forwarded 1:1, and will get marshalled + # by the low-level driver. + self._json_deserializer = lambda x: x + self._json_serializer = lambda x: x + + # Currently, our SQL parser doesn't support unquoted column names that + # start with _. Adding it here causes sqlalchemy to quote such columns. self.identifier_preparer.illegal_initial_characters.add('_') def initialize(self, connection): diff --git a/src/crate/client/sqlalchemy/tests/compiler_test.py b/src/crate/client/sqlalchemy/tests/compiler_test.py index 31ed7f3c..17612232 100644 --- a/src/crate/client/sqlalchemy/tests/compiler_test.py +++ b/src/crate/client/sqlalchemy/tests/compiler_test.py @@ -27,7 +27,7 @@ from sqlalchemy.sql import text, Update from crate.client.sqlalchemy.sa_version import SA_VERSION, SA_1_4, SA_2_0 -from crate.client.sqlalchemy.types import Craty +from crate.client.sqlalchemy.types import ObjectType class SqlAlchemyCompilerTest(TestCase): @@ -38,7 +38,7 @@ def setUp(self): self.metadata = sa.MetaData() self.mytable = sa.Table('mytable', self.metadata, sa.Column('name', sa.String), - sa.Column('data', Craty)) + sa.Column('data', ObjectType)) self.update = Update(self.mytable).where(text('name=:name')) self.values = [{'name': 'crate'}] diff --git a/src/crate/client/sqlalchemy/tests/dict_test.py b/src/crate/client/sqlalchemy/tests/dict_test.py index 2324591e..9695882b 100644 --- a/src/crate/client/sqlalchemy/tests/dict_test.py +++ b/src/crate/client/sqlalchemy/tests/dict_test.py @@ -31,7 +31,7 @@ except ImportError: from sqlalchemy.ext.declarative import declarative_base -from crate.client.sqlalchemy.types import Craty, ObjectArray +from crate.client.sqlalchemy.types import ObjectArray, ObjectType from crate.client.cursor import Cursor @@ -47,7 +47,7 @@ def setUp(self): metadata = sa.MetaData() self.mytable = sa.Table('mytable', metadata, sa.Column('name', sa.String), - sa.Column('data', Craty)) + sa.Column('data', ObjectType)) def assertSQL(self, expected_str, selectable): actual_expr = selectable.compile(bind=self.engine) @@ -124,7 +124,7 @@ class Character(Base): __tablename__ = 'characters' name = sa.Column(sa.String, primary_key=True) age = sa.Column(sa.Integer) - data = sa.Column(Craty) + data = sa.Column(ObjectType) data_list = sa.Column(ObjectArray) session = Session(bind=self.engine) @@ -140,7 +140,7 @@ def test_assign_null_to_object_array(self): self.assertEqual(char_3.data_list, [None]) @patch('crate.client.connection.Cursor', FakeCursor) - def test_assign_to_craty_type_after_commit(self): + def test_assign_to_object_type_after_commit(self): session, Character = self.set_up_character_and_cursor( return_value=[('Trillian', None)] ) diff --git a/src/crate/client/sqlalchemy/tests/match_test.py b/src/crate/client/sqlalchemy/tests/match_test.py index fdd5b7d0..735709c3 100644 --- a/src/crate/client/sqlalchemy/tests/match_test.py +++ b/src/crate/client/sqlalchemy/tests/match_test.py @@ -30,7 +30,7 @@ except ImportError: from sqlalchemy.ext.declarative import declarative_base -from crate.client.sqlalchemy.types import Craty +from crate.client.sqlalchemy.types import ObjectType from crate.client.sqlalchemy.predicates import match from crate.client.cursor import Cursor @@ -60,7 +60,7 @@ def set_up_character_and_session(self): class Character(Base): __tablename__ = 'characters' name = sa.Column(sa.String, primary_key=True) - info = sa.Column(Craty) + info = sa.Column(ObjectType) session = Session(bind=self.engine) return session, Character diff --git a/src/crate/client/sqlalchemy/tests/query_caching.py b/src/crate/client/sqlalchemy/tests/query_caching.py index fb4bdec3..037d6423 100644 --- a/src/crate/client/sqlalchemy/tests/query_caching.py +++ b/src/crate/client/sqlalchemy/tests/query_caching.py @@ -76,7 +76,7 @@ def setup_data(self): self.session.execute(sa.text("REFRESH TABLE testdrive.characters;")) @skipIf(SA_VERSION < SA_1_4, "On SA13, the 'ResultProxy' object has no attribute 'scalar_one'") - def test_object_multiple_select(self): + def test_object_multiple_select_legacy(self): """ The SQLAlchemy implementation of CrateDB's `OBJECT` type offers indexed access to the instance's content in form of a dictionary. Thus, it must @@ -85,6 +85,8 @@ def test_object_multiple_select(self): This test verifies that two subsequent `SELECT` statements are translated well, and don't trip on incorrect SQL compiled statement caching. + + This variant uses direct value matching on the `OBJECT`s attribute. """ self.setup_data() Character = self.Character @@ -97,6 +99,30 @@ def test_object_multiple_select(self): result = self.session.execute(selectable).scalar_one().data self.assertEqual({"y": 2}, result) + @skipIf(SA_VERSION < SA_1_4, "On SA13, the 'ResultProxy' object has no attribute 'scalar_one'") + def test_object_multiple_select_modern(self): + """ + The SQLAlchemy implementation of CrateDB's `OBJECT` type offers indexed + access to the instance's content in form of a dictionary. Thus, it must + not use `cache_ok = True` on its implementation, i.e. this part of the + compiled SQL clause must not be cached. + + This test verifies that two subsequent `SELECT` statements are translated + well, and don't trip on incorrect SQL compiled statement caching. + + This variant uses comparator method matching on the `OBJECT`s attribute. + """ + self.setup_data() + Character = self.Character + + selectable = sa.select(Character).where(Character.data['x'].as_integer() == 1) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"x": 1}, result) + + selectable = sa.select(Character).where(Character.data['y'].as_integer() == 2) + result = self.session.execute(selectable).scalar_one().data + self.assertEqual({"y": 2}, result) + @skipIf(SA_VERSION < SA_1_4, "On SA13, the 'ResultProxy' object has no attribute 'scalar_one'") def test_objectarray_multiple_select(self): """ diff --git a/src/crate/client/sqlalchemy/tests/warnings_test.py b/src/crate/client/sqlalchemy/tests/warnings_test.py index c300ad8c..80023005 100644 --- a/src/crate/client/sqlalchemy/tests/warnings_test.py +++ b/src/crate/client/sqlalchemy/tests/warnings_test.py @@ -8,14 +8,17 @@ class SqlAlchemyWarningsTest(TestCase, ExtraAssertions): + """ + Verify a few `DeprecationWarning` spots. + + https://docs.python.org/3/library/warnings.html#testing-warnings + """ @skipIf(SA_VERSION >= SA_1_4, "There is no deprecation warning for " "SQLAlchemy 1.3 on higher versions") def test_sa13_deprecation_warning(self): """ Verify that a `DeprecationWarning` is issued when running SQLAlchemy 1.3. - - https://docs.python.org/3/library/warnings.html#testing-warnings """ with warnings.catch_warnings(record=True) as w: @@ -31,3 +34,31 @@ def test_sa13_deprecation_warning(self): self.assertEqual(len(w), 1) self.assertIsSubclass(w[-1].category, DeprecationWarning) self.assertIn("SQLAlchemy 1.3 is effectively EOL.", str(w[-1].message)) + + def test_craty_object_deprecation_warning(self): + """ + Verify that a `DeprecationWarning` is issued when accessing the deprecated + module variables `Craty`, and `Object`. The new type is called `ObjectType`. + """ + + with warnings.catch_warnings(record=True) as w: + + # Import the deprecated symbol. + from crate.client.sqlalchemy.types import Craty # noqa: F401 + + # Verify details of the deprecation warning. + self.assertEqual(len(w), 1) + self.assertIsSubclass(w[-1].category, DeprecationWarning) + self.assertIn("Craty is deprecated and will be removed in future releases. " + "Please use ObjectType instead.", str(w[-1].message)) + + with warnings.catch_warnings(record=True) as w: + + # Import the deprecated symbol. + from crate.client.sqlalchemy.types import Object # noqa: F401 + + # Verify details of the deprecation warning. + self.assertEqual(len(w), 1) + self.assertIsSubclass(w[-1].category, DeprecationWarning) + self.assertIn("Object is deprecated and will be removed in future releases. " + "Please use ObjectType instead.", str(w[-1].message)) diff --git a/src/crate/client/sqlalchemy/types.py b/src/crate/client/sqlalchemy/types.py index 587858ac..f9899d92 100644 --- a/src/crate/client/sqlalchemy/types.py +++ b/src/crate/client/sqlalchemy/types.py @@ -18,6 +18,7 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. +import warnings import sqlalchemy.types as sqltypes from sqlalchemy.sql import operators, expression @@ -131,24 +132,31 @@ def __eq__(self, other): return dict.__eq__(self, other) -class _Craty(sqltypes.UserDefinedType): +class ObjectTypeImpl(sqltypes.UserDefinedType, sqltypes.JSON): + + __visit_name__ = "OBJECT" + cache_ok = False + none_as_null = False - class Comparator(sqltypes.TypeEngine.Comparator): - def __getitem__(self, key): - return default_comparator._binary_operate(self.expr, - operators.getitem, - key) +# Designated name to refer to. `Object` is too ambiguous. +ObjectType = MutableDict.as_mutable(ObjectTypeImpl) - def get_col_spec(self): - return 'OBJECT' +# Backward-compatibility aliases. +_deprecated_Craty = ObjectType +_deprecated_Object = ObjectType - type = MutableDict - comparator_factory = Comparator +# https://www.lesinskis.com/deprecating-module-scope-variables.html +deprecated_names = ["Craty", "Object"] -Object = Craty = MutableDict.as_mutable(_Craty) +def __getattr__(name): + if name in deprecated_names: + warnings.warn(f"{name} is deprecated and will be removed in future releases. " + f"Please use ObjectType instead.", DeprecationWarning) + return globals()[f"_deprecated_{name}"] + raise AttributeError(f"module {__name__} has no attribute {name}") class Any(expression.ColumnElement): From 9b77a19c0ee0d660f9df52a3cff1822112e769a1 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Wed, 10 May 2023 21:46:28 +0200 Subject: [PATCH 13/18] RTD: Update to Ubuntu 22 and Python 3.11 --- .readthedocs.yml | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .readthedocs.yml diff --git a/.readthedocs.yml b/.readthedocs.yml new file mode 100644 index 00000000..bfc1d655 --- /dev/null +++ b/.readthedocs.yml @@ -0,0 +1,25 @@ +# .readthedocs.yml +# Read the Docs configuration file + +# Details +# - https://docs.readthedocs.io/en/stable/config-file/v2.html + +# Required +version: 2 + +build: + os: "ubuntu-22.04" + tools: + python: "3.11" + +# Build documentation in the docs/ directory with Sphinx +sphinx: + configuration: docs/conf.py + +python: + install: + - requirements: docs/requirements.txt + +# Optionally build your docs in additional formats such as PDF +# formats: +# - pdf From 3267595f5dd281ceaec5d2693645eaa2803107e1 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 11 May 2023 03:15:03 +0200 Subject: [PATCH 14/18] SQLAlchemy: Add `insert_bulk` fast-path `INSERT` method for pandas This method supports efficient batch inserts using CrateDB's bulk operations endpoint. https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations Co-authored-by: hlcianfagna <110453267+hlcianfagna@users.noreply.github.com> --- CHANGES.txt | 3 + docs/by-example/index.rst | 1 + docs/by-example/sqlalchemy/dataframe.rst | 138 ++++++++++++++++++ docs/conf.py | 3 +- setup.py | 1 + src/crate/client/sqlalchemy/support.py | 62 ++++++++ .../client/sqlalchemy/tests/bulk_test.py | 43 +++++- src/crate/client/tests.py | 1 + 8 files changed, 248 insertions(+), 4 deletions(-) create mode 100644 docs/by-example/sqlalchemy/dataframe.rst create mode 100644 src/crate/client/sqlalchemy/support.py diff --git a/CHANGES.txt b/CHANGES.txt index 0bbb7465..6711df53 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,9 @@ Unreleased - SQLAlchemy: Refactor ``OBJECT`` type to use SQLAlchemy's JSON type infrastructure. +- SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in + order to support efficient batch inserts using CrateDB's "bulk operations" endpoint. + 2023/04/18 0.31.1 ================= diff --git a/docs/by-example/index.rst b/docs/by-example/index.rst index 54f90ce8..589beb99 100644 --- a/docs/by-example/index.rst +++ b/docs/by-example/index.rst @@ -48,6 +48,7 @@ its corresponding API interfaces, see also :ref:`sqlalchemy-support`. sqlalchemy/working-with-types sqlalchemy/advanced-querying sqlalchemy/inspection-reflection + sqlalchemy/dataframe .. _Python DB API: https://peps.python.org/pep-0249/ diff --git a/docs/by-example/sqlalchemy/dataframe.rst b/docs/by-example/sqlalchemy/dataframe.rst new file mode 100644 index 00000000..33350869 --- /dev/null +++ b/docs/by-example/sqlalchemy/dataframe.rst @@ -0,0 +1,138 @@ +.. _sqlalchemy-pandas: +.. _sqlalchemy-dataframe: + +================================ +SQLAlchemy: DataFrame operations +================================ + +About +===== + +This section of the documentation demonstrates support for efficient batch/bulk +``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect. + +Efficient bulk operations are needed for typical `ETL`_ batch processing and +data streaming workloads, for example to move data in- and out of OLAP data +warehouses, as contrasted to interactive online transaction processing (OLTP) +applications. The strategies of `batching`_ together series of records for +improving performance are also referred to as `chunking`_. + + +Introduction +============ + +The :ref:`pandas DataFrame ` is a structure that contains +two-dimensional data and its corresponding labels. DataFrames are widely used +in data science, machine learning, scientific computing, and many other +data-intensive fields. + +DataFrames are similar to SQL tables or the spreadsheets that you work with in +Excel or Calc. In many cases, DataFrames are faster, easier to use, and more +powerful than tables or spreadsheets because they are an integral part of the +`Python`_ and `NumPy`_ ecosystems. + +The :ref:`pandas I/O subsystem ` for `relational databases`_ +using `SQL`_ is based on `SQLAlchemy`_. + + +.. rubric:: Table of Contents + +.. contents:: + :local: + + +Efficient ``INSERT`` operations with pandas +=========================================== + +The package provides a ``bulk_insert`` function to use the +:meth:`pandas:pandas.DataFrame.to_sql` method more efficiently, based on the +`CrateDB bulk operations`_ endpoint. It will effectively split your insert +workload across multiple batches, using a defined chunk size. + + >>> import sqlalchemy as sa + >>> from pandas._testing import makeTimeDataFrame + >>> from crate.client.sqlalchemy.support import insert_bulk + ... + >>> # Define number of records, and chunk size. + >>> INSERT_RECORDS = 42 + >>> CHUNK_SIZE = 8 + ... + >>> # Create a pandas DataFrame, and connect to CrateDB. + >>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + >>> engine = sa.create_engine(f"crate://{crate_host}") + ... + >>> # Insert content of DataFrame using batches of records. + >>> # Effectively, it's six. 42 / 8 = 5.25. + >>> df.to_sql( + ... name="test-testdrive", + ... con=engine, + ... if_exists="replace", + ... index=False, + ... chunksize=CHUNK_SIZE, + ... method=insert_bulk, + ... ) + +.. TIP:: + + You will observe that the optimal chunk size highly depends on the shape of + your data, specifically the width of each record, i.e. the number of columns + and their individual sizes. You will need to determine a good chunk size by + running corresponding experiments on your own behalf. For that purpose, you + can use the `insert_pandas.py`_ program as a blueprint. + + A few details should be taken into consideration when determining the optimal + chunk size for a specific dataset. We are outlining the two major ones. + + - First, when working with data larger than the main memory available on your + machine, each chunk should be small enough to fit into the memory, but large + enough to minimize the overhead of a single data insert operation. Depending + on whether you are running other workloads on the same machine, you should + also account for the total share of heap memory you will assign to each domain, + to prevent overloading the system as a whole. + + - Second, as each batch is submitted using HTTP, you should know about the request + size limits and other constraints of your HTTP infrastructure, which may include + any types of HTTP intermediaries relaying information between your database client + application and your CrateDB cluster. For example, HTTP proxy servers or load + balancers not optimally configured for performance, or web application firewalls + and intrusion prevention systems may hamper HTTP communication, sometimes in + subtle ways, for example based on request size constraints, or throttling + mechanisms. If you are working with very busy systems, and hosting it on shared + infrastructure, details like `SNAT port exhaustion`_ may also come into play. + + You will need to determine a good chunk size by running corresponding experiments + on your own behalf. For that purpose, you can use the `insert_pandas.py`_ program + as a blueprint. + + It is a good idea to start your explorations with a chunk size of 5_000, and + then see if performance improves when you increase or decrease that figure. + Chunk sizes of 20000 may also be applicable, but make sure to take the limits + of your HTTP infrastructure into consideration. + + In order to learn more about what wide- vs. long-form (tidy, stacked, narrow) + data means in the context of `DataFrame computing`_, let us refer you to `a + general introduction `_, the corresponding section in + the `Data Computing book `_, and a `pandas + tutorial `_ about the same topic. + + +.. hidden: Disconnect from database + + >>> engine.dispose() + + +.. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage +.. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing) +.. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations +.. _DataFrame computing: https://realpython.com/pandas-dataframe/ +.. _insert_pandas.py: https://github.com/crate/crate-python/blob/master/examples/insert_pandas.py +.. _NumPy: https://en.wikipedia.org/wiki/NumPy +.. _pandas: https://en.wikipedia.org/wiki/Pandas_(software) +.. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html +.. _Python: https://en.wikipedia.org/wiki/Python_(programming_language) +.. _relational databases: https://en.wikipedia.org/wiki/Relational_database +.. _SQL: https://en.wikipedia.org/wiki/SQL +.. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html +.. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data +.. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow +.. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data diff --git a/docs/conf.py b/docs/conf.py index 8267b131..f8f8465c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,7 +12,8 @@ intersphinx_mapping.update({ 'py': ('https://docs.python.org/3/', None), 'sa': ('https://docs.sqlalchemy.org/en/14/', None), - 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None) + 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None), + 'pandas': ('https://pandas.pydata.org/docs/', None), }) diff --git a/setup.py b/setup.py index be1b8a5c..e3d3de47 100644 --- a/setup.py +++ b/setup.py @@ -70,6 +70,7 @@ def read(path): 'createcoverage>=1,<2', 'stopit>=1.1.2,<2', 'flake8>=4,<7', + 'pandas>=2,<3', 'pytz', # `test_http.py` needs `setuptools.ssl_support` 'setuptools<57', diff --git a/src/crate/client/sqlalchemy/support.py b/src/crate/client/sqlalchemy/support.py new file mode 100644 index 00000000..326e41ce --- /dev/null +++ b/src/crate/client/sqlalchemy/support.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8; -*- +# +# Licensed to CRATE Technology GmbH ("Crate") under one or more contributor +# license agreements. See the NOTICE file distributed with this work for +# additional information regarding copyright ownership. Crate licenses +# this file to you under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. You may +# obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# However, if you have executed another commercial license agreement +# with Crate these terms will supersede the license and you may use the +# software solely pursuant to the terms of the relevant commercial agreement. +import logging + + +logger = logging.getLogger(__name__) + + +def insert_bulk(pd_table, conn, keys, data_iter): + """ + Use CrateDB's "bulk operations" endpoint as a fast path for pandas' and Dask's `to_sql()` [1] method. + + The idea is to break out of SQLAlchemy, compile the insert statement, and use the raw + DBAPI connection client, in order to invoke a request using `bulk_parameters` [2]:: + + cursor.execute(sql=sql, bulk_parameters=data) + + The vanilla implementation, used by SQLAlchemy, is:: + + data = [dict(zip(keys, row)) for row in data_iter] + conn.execute(pd_table.table.insert(), data) + + Batch chunking will happen outside of this function, for example [3] demonstrates + the relevant code in `pandas.io.sql`. + + [1] https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html + [2] https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + [3] https://github.com/pandas-dev/pandas/blob/v2.0.1/pandas/io/sql.py#L1011-L1027 + """ + + # Compile SQL statement and materialize batch. + sql = str(pd_table.table.insert().compile(bind=conn)) + data = list(data_iter) + + # For debugging and tracing the batches running through this method. + if logger.level == logging.DEBUG: + logger.debug(f"Bulk SQL: {sql}") + logger.debug(f"Bulk records: {len(data)}") + # logger.debug(f"Bulk data: {data}") + + # Invoke bulk insert operation. + cursor = conn._dbapi_connection.cursor() + cursor.execute(sql=sql, bulk_parameters=data) + cursor.close() diff --git a/src/crate/client/sqlalchemy/tests/bulk_test.py b/src/crate/client/sqlalchemy/tests/bulk_test.py index 71949f25..317d1fd3 100644 --- a/src/crate/client/sqlalchemy/tests/bulk_test.py +++ b/src/crate/client/sqlalchemy/tests/bulk_test.py @@ -18,7 +18,7 @@ # However, if you have executed another commercial license agreement # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. - +import math from unittest import TestCase, skipIf from unittest.mock import patch, MagicMock @@ -36,8 +36,7 @@ fake_cursor = MagicMock(name='fake_cursor') -FakeCursor = MagicMock(name='FakeCursor', spec=Cursor) -FakeCursor.return_value = fake_cursor +FakeCursor = MagicMock(name='FakeCursor', spec=Cursor, return_value=fake_cursor) class SqlAlchemyBulkTest(TestCase): @@ -168,3 +167,41 @@ def test_bulk_save_modern(self): 'Callisto', 37, ) self.assertSequenceEqual(expected_bulk_args, bulk_args) + + @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) + def test_bulk_save_pandas(self, mock_cursor): + """ + Verify bulk INSERT with pandas. + """ + import sqlalchemy as sa + from pandas._testing import makeTimeDataFrame + from crate.client.sqlalchemy.support import insert_bulk + + # 42 records / 8 chunksize = 5.25, which means 6 batches will be emitted. + INSERT_RECORDS = 42 + CHUNK_SIZE = 8 + OPCOUNT = math.ceil(INSERT_RECORDS / CHUNK_SIZE) + + # Create a DataFrame to feed into the database. + df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + + dburi = "crate://localhost:4200" + engine = sa.create_engine(dburi, echo=True) + retval = df.to_sql( + name="test-testdrive", + con=engine, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + ) + self.assertIsNone(retval) + + # Initializing the query has an overhead of two calls to the cursor object, probably one + # initial connection from the DB-API driver, to inquire the database version, and another + # one, for SQLAlchemy. SQLAlchemy will use it to inquire the table schema using `information_schema`, + # and to eventually issue the `CREATE TABLE ...` statement. + effective_op_count = mock_cursor.call_count - 2 + + # Verify number of batches. + self.assertEqual(effective_op_count, OPCOUNT) diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index 953988ab..455a7a4d 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -388,6 +388,7 @@ def test_suite(): 'docs/by-example/sqlalchemy/working-with-types.rst', 'docs/by-example/sqlalchemy/advanced-querying.rst', 'docs/by-example/sqlalchemy/inspection-reflection.rst', + 'docs/by-example/sqlalchemy/dataframe.rst', module_relative=False, setUp=setUpCrateLayerSqlAlchemy, tearDown=tearDownDropEntitiesSqlAlchemy, From 94de03e7fa4f21d1d60bf7f02639c9bc22dfa527 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 11 May 2023 11:40:31 +0200 Subject: [PATCH 15/18] Documentation: Improve section about batch operations with pandas Specifically, outline _two_ concrete considerations for determining the optimal chunk size, and improve wording. --- docs/by-example/sqlalchemy/dataframe.rst | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/docs/by-example/sqlalchemy/dataframe.rst b/docs/by-example/sqlalchemy/dataframe.rst index 33350869..ffcadf86 100644 --- a/docs/by-example/sqlalchemy/dataframe.rst +++ b/docs/by-example/sqlalchemy/dataframe.rst @@ -76,9 +76,8 @@ workload across multiple batches, using a defined chunk size. You will observe that the optimal chunk size highly depends on the shape of your data, specifically the width of each record, i.e. the number of columns - and their individual sizes. You will need to determine a good chunk size by - running corresponding experiments on your own behalf. For that purpose, you - can use the `insert_pandas.py`_ program as a blueprint. + and their individual sizes, which will in the end determine the total size of + each batch/chunk. A few details should be taken into consideration when determining the optimal chunk size for a specific dataset. We are outlining the two major ones. @@ -106,8 +105,11 @@ workload across multiple batches, using a defined chunk size. It is a good idea to start your explorations with a chunk size of 5_000, and then see if performance improves when you increase or decrease that figure. - Chunk sizes of 20000 may also be applicable, but make sure to take the limits - of your HTTP infrastructure into consideration. + People are reporting that 10_000-20_000 is their optimal setting, but if you + process, for example, just three "small" columns, you may also experiment with + `leveling up to 200_000`_, because `the chunksize should not be too small`_. + If it is too small, the I/O cost will be too high to overcome the benefit of + batching. In order to learn more about what wide- vs. long-form (tidy, stacked, narrow) data means in the context of `DataFrame computing`_, let us refer you to `a @@ -125,7 +127,8 @@ workload across multiple batches, using a defined chunk size. .. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing) .. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations .. _DataFrame computing: https://realpython.com/pandas-dataframe/ -.. _insert_pandas.py: https://github.com/crate/crate-python/blob/master/examples/insert_pandas.py +.. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py +.. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/ .. _NumPy: https://en.wikipedia.org/wiki/NumPy .. _pandas: https://en.wikipedia.org/wiki/Pandas_(software) .. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html @@ -133,6 +136,7 @@ workload across multiple batches, using a defined chunk size. .. _relational databases: https://en.wikipedia.org/wiki/Relational_database .. _SQL: https://en.wikipedia.org/wiki/SQL .. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html +.. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/ .. _wide-narrow-general: https://en.wikipedia.org/wiki/Wide_and_narrow_data .. _wide-narrow-data-computing: https://dtkaplan.github.io/DataComputingEbook/chap-wide-vs-narrow.html#chap:wide-vs-narrow .. _wide-narrow-pandas-tutorial: https://anvil.works/blog/tidy-data From 061efb082a9def33c1815165d9e071fd7c538150 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 11 May 2023 13:30:17 +0200 Subject: [PATCH 16/18] SQLAlchemy: Test suite adjustments for pandas software tests --- docs/conf.py | 1 + setup.py | 2 +- src/crate/client/sqlalchemy/tests/bulk_test.py | 6 ++++-- src/crate/client/tests.py | 16 ++++++++++++++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index f8f8465c..6cea16ec 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -18,6 +18,7 @@ linkcheck_anchors = True +linkcheck_ignore = [r"https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/.*"] rst_prolog = """ diff --git a/setup.py b/setup.py index e3d3de47..3055075a 100644 --- a/setup.py +++ b/setup.py @@ -70,7 +70,7 @@ def read(path): 'createcoverage>=1,<2', 'stopit>=1.1.2,<2', 'flake8>=4,<7', - 'pandas>=2,<3', + 'pandas', 'pytz', # `test_http.py` needs `setuptools.ssl_support` 'setuptools<57', diff --git a/src/crate/client/sqlalchemy/tests/bulk_test.py b/src/crate/client/sqlalchemy/tests/bulk_test.py index 317d1fd3..48b3797c 100644 --- a/src/crate/client/sqlalchemy/tests/bulk_test.py +++ b/src/crate/client/sqlalchemy/tests/bulk_test.py @@ -19,13 +19,14 @@ # with Crate these terms will supersede the license and you may use the # software solely pursuant to the terms of the relevant commercial agreement. import math +import sys from unittest import TestCase, skipIf from unittest.mock import patch, MagicMock import sqlalchemy as sa from sqlalchemy.orm import Session -from crate.client.sqlalchemy.sa_version import SA_VERSION, SA_2_0 +from crate.client.sqlalchemy.sa_version import SA_VERSION, SA_2_0, SA_1_4 try: from sqlalchemy.orm import declarative_base @@ -168,12 +169,13 @@ def test_bulk_save_modern(self): ) self.assertSequenceEqual(expected_bulk_args, bulk_args) + @skipIf(sys.version_info < (3, 8), "SQLAlchemy/pandas is not supported on Python <3.8") + @skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas") @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) def test_bulk_save_pandas(self, mock_cursor): """ Verify bulk INSERT with pandas. """ - import sqlalchemy as sa from pandas._testing import makeTimeDataFrame from crate.client.sqlalchemy.support import insert_bulk diff --git a/src/crate/client/tests.py b/src/crate/client/tests.py index 455a7a4d..4ce9c950 100644 --- a/src/crate/client/tests.py +++ b/src/crate/client/tests.py @@ -24,6 +24,7 @@ import json import os import socket +import sys import unittest import doctest from pprint import pprint @@ -40,6 +41,7 @@ crate_host, crate_path, crate_port, \ crate_transport_port, docs_path, localhost from crate.client import connect +from .sqlalchemy import SA_VERSION, SA_1_4 from .test_cursor import CursorTest from .test_connection import ConnectionTest @@ -382,13 +384,23 @@ def test_suite(): s.layer = ensure_cratedb_layer() suite.addTest(s) - s = doctest.DocFileSuite( + sqlalchemy_integration_tests = [ 'docs/by-example/sqlalchemy/getting-started.rst', 'docs/by-example/sqlalchemy/crud.rst', 'docs/by-example/sqlalchemy/working-with-types.rst', 'docs/by-example/sqlalchemy/advanced-querying.rst', 'docs/by-example/sqlalchemy/inspection-reflection.rst', - 'docs/by-example/sqlalchemy/dataframe.rst', + ] + + # Don't run DataFrame integration tests on SQLAlchemy 1.3 and Python 3.7. + skip_dataframe = SA_VERSION < SA_1_4 or sys.version_info < (3, 8) + if not skip_dataframe: + sqlalchemy_integration_tests += [ + 'docs/by-example/sqlalchemy/dataframe.rst', + ] + + s = doctest.DocFileSuite( + *sqlalchemy_integration_tests, module_relative=False, setUp=setUpCrateLayerSqlAlchemy, tearDown=tearDownDropEntitiesSqlAlchemy, From 4aeb98cce598149f063ad63d94844b5d70b6baa8 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 11 May 2023 15:13:51 +0200 Subject: [PATCH 17/18] SQLAlchemy: Add documentation and tests for usage with Dask --- CHANGES.txt | 2 + docs/by-example/sqlalchemy/dataframe.rst | 124 +++++++++++++++++- docs/conf.py | 1 + setup.py | 1 + .../client/sqlalchemy/tests/bulk_test.py | 47 +++++++ 5 files changed, 171 insertions(+), 4 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 6711df53..078296dc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -20,6 +20,8 @@ Unreleased - SQLAlchemy: Added ``insert_bulk`` fast-path ``INSERT`` method for pandas, in order to support efficient batch inserts using CrateDB's "bulk operations" endpoint. +- SQLAlchemy: Add documentation and software tests for usage with Dask + 2023/04/18 0.31.1 ================= diff --git a/docs/by-example/sqlalchemy/dataframe.rst b/docs/by-example/sqlalchemy/dataframe.rst index ffcadf86..a2be1f88 100644 --- a/docs/by-example/sqlalchemy/dataframe.rst +++ b/docs/by-example/sqlalchemy/dataframe.rst @@ -5,6 +5,12 @@ SQLAlchemy: DataFrame operations ================================ +.. rubric:: Table of Contents + +.. contents:: + :local: + + About ===== @@ -12,7 +18,7 @@ This section of the documentation demonstrates support for efficient batch/bulk ``INSERT`` operations with `pandas`_ and `Dask`_, using the CrateDB SQLAlchemy dialect. Efficient bulk operations are needed for typical `ETL`_ batch processing and -data streaming workloads, for example to move data in- and out of OLAP data +data streaming workloads, for example to move data in and out of OLAP data warehouses, as contrasted to interactive online transaction processing (OLTP) applications. The strategies of `batching`_ together series of records for improving performance are also referred to as `chunking`_. @@ -21,6 +27,8 @@ improving performance are also referred to as `chunking`_. Introduction ============ +pandas +------ The :ref:`pandas DataFrame ` is a structure that contains two-dimensional data and its corresponding labels. DataFrames are widely used in data science, machine learning, scientific computing, and many other @@ -34,11 +42,29 @@ powerful than tables or spreadsheets because they are an integral part of the The :ref:`pandas I/O subsystem ` for `relational databases`_ using `SQL`_ is based on `SQLAlchemy`_. +Dask +---- +`Dask`_ is a flexible library for parallel computing in Python, which scales +Python code from multi-core local machines to large distributed clusters in +the cloud. Dask provides a familiar user interface by mirroring the APIs of +other libraries in the PyData ecosystem, including `pandas`_, `scikit-learn`_, +and `NumPy`_. -.. rubric:: Table of Contents +A :doc:`dask:dataframe` is a large parallel DataFrame composed of many smaller +pandas DataFrames, split along the index. These pandas DataFrames may live on +disk for larger-than-memory computing on a single machine, or on many different +machines in a cluster. One Dask DataFrame operation triggers many operations on +the constituent pandas DataFrames. -.. contents:: - :local: + +Compatibility notes +=================== + +.. NOTE:: + + Please note that DataFrame support for pandas and Dask is only validated + with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend + to use the most recent versions of those libraries. Efficient ``INSERT`` operations with pandas @@ -118,6 +144,91 @@ workload across multiple batches, using a defined chunk size. tutorial `_ about the same topic. +Efficient ``INSERT`` operations with Dask +========================================= + +The same ``bulk_insert`` function presented in the previous section will also +be used in the context of `Dask`_, in order to make the +:func:`dask:dask.dataframe.to_sql` method more efficiently, based on the +`CrateDB bulk operations`_ endpoint. + +The example below will partition your insert workload into equal-sized parts, and +schedule it to be executed on Dask cluster resources, using a defined number of +compute partitions. Each worker instance will then insert its partition's records +in a batched/chunked manner, using a defined chunk size, effectively using the +pandas implementation introduced in the previous section. + + >>> import dask.dataframe as dd + >>> from pandas._testing import makeTimeDataFrame + >>> from crate.client.sqlalchemy.support import insert_bulk + ... + >>> # Define the number of records, the number of computing partitions, + >>> # and the chunk size of each database insert operation. + >>> INSERT_RECORDS = 100 + >>> NPARTITIONS = 4 + >>> CHUNK_SIZE = 25 + ... + >>> # Create a Dask DataFrame. + >>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + >>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + ... + >>> # Insert content of DataFrame using multiple workers on a + >>> # compute cluster, transferred using batches of records. + >>> ddf.to_sql( + ... name="test-testdrive", + ... uri=f"crate://{crate_host}", + ... if_exists="replace", + ... index=False, + ... chunksize=CHUNK_SIZE, + ... method=insert_bulk, + ... parallel=True, + ... ) + + +.. TIP:: + + You will observe that optimizing your workload will now also involve determining a + good value for the ``NPARTITIONS`` argument, based on the capacity and topology of + the available compute resources, and based on workload characteristics or policies + like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully + dedicated to the problem at hand, you may want to use all available processor cores, + while on a shared system, this strategy may not be appropriate. + + If you want to dedicate all available compute resources on your machine, you may want + to use the number of CPU cores as a value to the ``NPARTITIONS`` argument. You can find + out about the available CPU cores on your machine, for example by running the ``nproc`` + command in your terminal. + + Depending on the implementation and runtime behavior of the compute task, the optimal + number of worker processes, determined by the ``NPARTITIONS`` argument, also needs to be + figured out by running a few test iterations. For that purpose, you can use the + `insert_dask.py`_ program as a blueprint. + + Adjusting this value in both directions is perfectly fine: If you observe that you are + overloading the machine, maybe because there are workloads scheduled other than the one + you are running, try to reduce the value. If fragments/steps of your implementation + involve waiting for network or disk I/O, you may want to increase the number of workers + beyond the number of available CPU cores, to increase utilization. On the other hand, + you should be wary about not over-committing resources too much, as it may slow your + system down. + + Before getting more serious with Dask, you are welcome to read and watch the excellent + :doc:`dask:best-practices` and :ref:`dask:dataframe.performance` resources, in order to + learn about things to avoid, and beyond. For finding out if your compute workload + scheduling is healthy, you can, for example, use Dask's :doc:`dask:dashboard`. + +.. WARNING:: + + Because the settings assigned in the example above fit together well, the ``to_sql()`` + instruction will effectively run four insert operations, executed in parallel, and + scheduled optimally on the available cluster resources. + + However, not using those settings sensibly, you can easily misconfigure the resource + scheduling system, and overload the underlying hardware or operating system, virtualized + or not. This is why experimenting with different parameters, and a real dataset, is crucial. + + + .. hidden: Disconnect from database >>> engine.dispose() @@ -126,7 +237,10 @@ workload across multiple batches, using a defined chunk size. .. _batching: https://en.wikipedia.org/wiki/Batch_processing#Common_batch_processing_usage .. _chunking: https://en.wikipedia.org/wiki/Chunking_(computing) .. _CrateDB bulk operations: https://crate.io/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations +.. _Dask: https://en.wikipedia.org/wiki/Dask_(software) .. _DataFrame computing: https://realpython.com/pandas-dataframe/ +.. _ETL: https://en.wikipedia.org/wiki/Extract,_transform,_load +.. _insert_dask.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_dask.py .. _insert_pandas.py: https://github.com/crate/cratedb-examples/blob/main/by-language/python-sqlalchemy/insert_pandas.py .. _leveling up to 200_000: https://acepor.github.io/2017/08/03/using-chunksize/ .. _NumPy: https://en.wikipedia.org/wiki/NumPy @@ -134,6 +248,8 @@ workload across multiple batches, using a defined chunk size. .. _pandas DataFrame: https://pandas.pydata.org/pandas-docs/stable/reference/frame.html .. _Python: https://en.wikipedia.org/wiki/Python_(programming_language) .. _relational databases: https://en.wikipedia.org/wiki/Relational_database +.. _scikit-learn: https://en.wikipedia.org/wiki/Scikit-learn +.. _SNAT port exhaustion: https://learn.microsoft.com/en-us/azure/load-balancer/troubleshoot-outbound-connection .. _SQL: https://en.wikipedia.org/wiki/SQL .. _SQLAlchemy: https://aosabook.org/en/v2/sqlalchemy.html .. _the chunksize should not be too small: https://acepor.github.io/2017/08/03/using-chunksize/ diff --git a/docs/conf.py b/docs/conf.py index 6cea16ec..59cc622f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -13,6 +13,7 @@ 'py': ('https://docs.python.org/3/', None), 'sa': ('https://docs.sqlalchemy.org/en/14/', None), 'urllib3': ('https://urllib3.readthedocs.io/en/1.26.13/', None), + 'dask': ('https://docs.dask.org/en/stable/', None), 'pandas': ('https://pandas.pydata.org/docs/', None), }) diff --git a/setup.py b/setup.py index 3055075a..5d4ee00b 100644 --- a/setup.py +++ b/setup.py @@ -68,6 +68,7 @@ def read(path): 'zope.testrunner>=5,<7', 'zc.customdoctests>=1.0.1,<2', 'createcoverage>=1,<2', + 'dask', 'stopit>=1.1.2,<2', 'flake8>=4,<7', 'pandas', diff --git a/src/crate/client/sqlalchemy/tests/bulk_test.py b/src/crate/client/sqlalchemy/tests/bulk_test.py index 48b3797c..4546d1a4 100644 --- a/src/crate/client/sqlalchemy/tests/bulk_test.py +++ b/src/crate/client/sqlalchemy/tests/bulk_test.py @@ -207,3 +207,50 @@ def test_bulk_save_pandas(self, mock_cursor): # Verify number of batches. self.assertEqual(effective_op_count, OPCOUNT) + + @skipIf(sys.version_info < (3, 8), "SQLAlchemy/Dask is not supported on Python <3.8") + @skipIf(SA_VERSION < SA_1_4, "SQLAlchemy 1.3 is not supported by pandas") + @patch('crate.client.connection.Cursor', mock_cursor=FakeCursor) + def test_bulk_save_dask(self, mock_cursor): + """ + Verify bulk INSERT with Dask. + """ + import dask.dataframe as dd + from pandas._testing import makeTimeDataFrame + from crate.client.sqlalchemy.support import insert_bulk + + # 42 records / 4 partitions means each partition has a size of 10.5 elements. + # Because the chunk size 8 is slightly smaller than 10, the partition will not + # fit into it, so two batches will be emitted to the database for each data + # partition. 4 partitions * 2 batches = 8 insert operations will be emitted. + # Those settings are a perfect example of non-optimal settings, and have been + # made so on purpose, in order to demonstrate that using optimal settings + # is crucial. + INSERT_RECORDS = 42 + NPARTITIONS = 4 + CHUNK_SIZE = 8 + OPCOUNT = math.ceil(INSERT_RECORDS / NPARTITIONS / CHUNK_SIZE) * NPARTITIONS + + # Create a DataFrame to feed into the database. + df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S") + ddf = dd.from_pandas(df, npartitions=NPARTITIONS) + + dburi = "crate://localhost:4200" + retval = ddf.to_sql( + name="test-testdrive", + uri=dburi, + if_exists="replace", + index=False, + chunksize=CHUNK_SIZE, + method=insert_bulk, + parallel=True, + ) + self.assertIsNone(retval) + + # Each of the insert operation incurs another call to the cursor object. This is probably + # the initial connection from the DB-API driver, to inquire the database version. + # This compensation formula has been determined empirically / by educated guessing. + effective_op_count = (mock_cursor.call_count - 2 * NPARTITIONS) - 2 + + # Verify number of batches. + self.assertEqual(effective_op_count, OPCOUNT) From d9473d0c8c6d0ee1e4342e1e9b31b835c99e493d Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Thu, 6 Jul 2023 02:03:00 +0200 Subject: [PATCH 18/18] Release 0.32.0 --- CHANGES.txt | 4 ++++ src/crate/client/__init__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index 078296dc..4f58c8d2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,10 @@ Changes for crate Unreleased ========== + +2023/07/06 0.32.0 +================= + - SQLAlchemy DDL: Allow turning off column store using ``crate_columnstore=False``. Thanks, @fetzerms. diff --git a/src/crate/client/__init__.py b/src/crate/client/__init__.py index 59cbb6e0..604331c2 100644 --- a/src/crate/client/__init__.py +++ b/src/crate/client/__init__.py @@ -29,7 +29,7 @@ # version string read from setup.py using a regex. Take care not to break the # regex! -__version__ = "0.31.1" +__version__ = "0.32.0" apilevel = "2.0" threadsafety = 2