From 077dc3b99bba6b76c111e5fdc525611d44b7a962 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 4 Dec 2023 16:35:14 +0100 Subject: [PATCH 01/31] refactor: standardize bigquery options handling to manage more options --- sqlalchemy_bigquery/base.py | 52 +++++++++++++++++++++++++++---------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 5297f223..2e81bc2e 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -647,25 +647,49 @@ def get_column_specification(self, column, **kwargs): def post_create_table(self, table): bq_opts = table.dialect_options["bigquery"] - opts = [] + + partition_by_or_cluster_by_or_options = [] + + options = {} if ("description" in bq_opts) or table.comment: - description = process_string_literal( - bq_opts.get("description", table.comment) - ) - opts.append(f"description={description}") + description = bq_opts.get("description", table.comment) + options["description"] = description - if "friendly_name" in bq_opts: - opts.append( - "friendly_name={}".format( - process_string_literal(bq_opts["friendly_name"]) - ) - ) + table_option_list = { + "friendly_name": str, + } - if opts: - return "\nOPTIONS({})".format(", ".join(opts)) + for table_option in table_option_list: + if table_option in bq_opts: + options[table_option] = bq_opts.get(table_option) - return "" + if options: + + def process_option_value(option, value): + if option in table_option_list: + assert isinstance( + value, table_option_list[option] + ), f"bigquery_{option} dialect option accepts only {table_option_list[option]}, provided {repr(value)}" + else: + return process_string_literal(value) + + if table_option_list[option] == str: + return process_string_literal(value) + if table_option_list[option] == int: + return int(value) + if table_option_list[option] == bool: + return "true" if value else "false" + if table_option_list[option] == datetime.datetime: + return BQTimestamp.process_timestamp_literal(value) + + options_list = [ + f"{k}={process_option_value(k,v)}" for (k, v) in options.items() + ] + options_str = f"OPTIONS({', '.join(options_list)})" + partition_by_or_cluster_by_or_options.append(options_str) + + return " " + "\n".join(partition_by_or_cluster_by_or_options) def visit_set_table_comment(self, create): table_name = self.preparer.format_table(create.element) From aae635946b459b52b8c920dba9c3750c35e385f4 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 4 Dec 2023 18:28:20 +0100 Subject: [PATCH 02/31] feat: handle table partitioning, table clustering and more table options (expiration_timestamp, expiration_timestamp, require_partition_filter, default_rounding_mode) via create_table dialect options --- sqlalchemy_bigquery/base.py | 33 ++++++- tests/unit/test_table_options.py | 157 +++++++++++++++++++++++++++++++ 2 files changed, 189 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_table_options.py diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 2e81bc2e..787e6b99 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -19,6 +19,7 @@ """Integration between SQLAlchemy and BigQuery.""" +import datetime from decimal import Decimal import random import operator @@ -35,7 +36,7 @@ import sqlalchemy.sql.functions import sqlalchemy.sql.sqltypes import sqlalchemy.sql.type_api -from sqlalchemy.exc import NoSuchTableError +from sqlalchemy.exc import NoSuchTableError, NoSuchColumnError from sqlalchemy import util from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.compiler import ( @@ -650,6 +651,32 @@ def post_create_table(self, table): partition_by_or_cluster_by_or_options = [] + if "partitioning" in bq_opts: + partition_expr = bq_opts.get("partitioning") + + assert isinstance( + partition_expr, str + ), f"bigquery_partitioning dialect option accepts only {str}, provided {repr(partition_expr)}" + + partition_by_or_cluster_by_or_options.append( + f"PARTITION BY {partition_expr}" + ) + + if "clustering_fields" in bq_opts: + clustering_fields = bq_opts.get("clustering_fields") + + assert isinstance( + clustering_fields, list + ), f"bigquery_clustering_fields dialect option accepts only {list}, provided {repr(clustering_fields)}" + + for field in clustering_fields: + if field not in table.c: + raise NoSuchColumnError(field) + + partition_by_or_cluster_by_or_options.append( + f"CLUSTER BY {', '.join(clustering_fields)}" + ) + options = {} if ("description" in bq_opts) or table.comment: @@ -658,6 +685,10 @@ def post_create_table(self, table): table_option_list = { "friendly_name": str, + "expiration_timestamp": datetime.datetime, + "partition_expiration_days": int, + "require_partition_filter": bool, + "default_rounding_mode": str, } for table_option in table_option_list: diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py new file mode 100644 index 00000000..6e1b4697 --- /dev/null +++ b/tests/unit/test_table_options.py @@ -0,0 +1,157 @@ +# Copyright (c) 2021 The sqlalchemy-bigquery Authors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import datetime +import sqlite3 +import pytest +import sqlalchemy + +from .conftest import setup_table + + +def test_table_expiration_timestamp_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00')" + ) + + +def test_table_partition_expiration_days_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_partition_expiration_days=30, + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(partition_expiration_days=30)" + ) + + +def test_table_require_partition_filter_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_require_partition_filter=True, + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(require_partition_filter=true)" + ) + + +def test_table_default_rounding_mode_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(default_rounding_mode='ROUND_HALF_EVEN')" + ) + + +def test_table_clustering_fields_dialect_option_no_such_column(faux_conn): + with pytest.raises(sqlalchemy.exc.NoSuchColumnError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_clustering_fields=["country", "unknown"], + ) + + +def test_table_clustering_fields_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields=["country", "town"], + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING )" + " CLUSTER BY country, town" + ) + + +def test_table_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_partitioning="DATE(createdAt)", + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE(createdAt)" + ) + + +def test_table_all_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering and partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + bigquery_partition_expiration_days=30, + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["country", "town"], + bigquery_partitioning="DATE(createdAt)", + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )" + " PARTITION BY DATE(createdAt)" + " CLUSTER BY country, town" + " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', partition_expiration_days=30, require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" + ) From f1f334b9f35b245c046452a6ad827426141402c5 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 4 Dec 2023 23:41:01 +0100 Subject: [PATCH 03/31] fix: having clustering fields and partitioning exposed has table indexes leads to bad autogenerated version file def upgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.drop_index('clustering', table_name='dataset.some_table') op.drop_index('partition', table_name='dataset.some_table') # ### end Alembic commands ### def downgrade() -> None: # ### commands auto generated by Alembic - please adjust! ### op.create_index('partition', 'dataset.some_table', ['createdAt'], unique=False) op.create_index('clustering', 'dataset.some_table', ['id', 'createdAt'], unique=False) # ### end Alembic commands ### --- sqlalchemy_bigquery/base.py | 21 ++------------------- tests/unit/test_catalog_functions.py | 10 ---------- 2 files changed, 2 insertions(+), 29 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 787e6b99..9a22fb76 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -1044,25 +1044,8 @@ def get_pk_constraint(self, connection, table_name, schema=None, **kw): return {"constrained_columns": []} def get_indexes(self, connection, table_name, schema=None, **kw): - table = self._get_table(connection, table_name, schema) - indexes = [] - if table.time_partitioning: - indexes.append( - { - "name": "partition", - "column_names": [table.time_partitioning.field], - "unique": False, - } - ) - if table.clustering_fields: - indexes.append( - { - "name": "clustering", - "column_names": table.clustering_fields, - "unique": False, - } - ) - return indexes + # BigQuery has no support for indexes. + return [] def get_schema_names(self, connection, **kw): if isinstance(connection, Engine): diff --git a/tests/unit/test_catalog_functions.py b/tests/unit/test_catalog_functions.py index 78614c9f..56b120f4 100644 --- a/tests/unit/test_catalog_functions.py +++ b/tests/unit/test_catalog_functions.py @@ -127,16 +127,6 @@ def test_get_indexes(faux_conn): client.tables.foo.clustering_fields = ["user_email", "store_code"] assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [ - dict( - name="partition", - column_names=["tm"], - unique=False, - ), - dict( - name="clustering", - column_names=["user_email", "store_code"], - unique=False, - ), ] From 16c63e9eaec8c52b45f3750cd9988f39daee314b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 5 Dec 2023 12:43:44 +0100 Subject: [PATCH 04/31] docs: update README to describe how to create clustered and partitioned table as well as other newly supported table options --- README.rst | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index a2036289..b014bea3 100644 --- a/README.rst +++ b/README.rst @@ -292,7 +292,12 @@ To add metadata to a table: .. code-block:: python - table = Table('mytable', ..., bigquery_description='my table description', bigquery_friendly_name='my table friendly name') + table = Table('mytable', ..., + bigquery_description='my table description', + bigquery_friendly_name='my table friendly name', + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_expiration_timestamp=datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) To add metadata to a column: @@ -300,6 +305,22 @@ To add metadata to a column: Column('mycolumn', doc='my column description') +To create a clustered table: + +.. code-block:: python + + table = Table('mytable', ..., bigquery_clustering_fields=["a", "b", "c"]) + +To create a partitioned table: + +.. code-block:: python + + table = Table('mytable', ..., + bigquery_partitioning="DATE(mytimestamp)", + bigquery_partition_expiration_days=90, + bigquery_require_partition_filter=True, + ) + Threading and Multiprocessing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ From 2cae63017db2c40e620500947b35056ad62f89af Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sat, 9 Dec 2023 12:06:32 +0100 Subject: [PATCH 05/31] test: adjust system tests since indexes are no longer populated from table partitions and clustering info --- tests/system/test_sqlalchemy_bigquery.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 62b534ff..b0932c61 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -594,17 +594,8 @@ def test_view_names(inspector, inspector_using_test_dataset, bigquery_dataset): def test_get_indexes(inspector, inspector_using_test_dataset, bigquery_dataset): for _ in [f"{bigquery_dataset}.sample", f"{bigquery_dataset}.sample_one_row"]: indexes = inspector.get_indexes(f"{bigquery_dataset}.sample") - assert len(indexes) == 2 - assert indexes[0] == { - "name": "partition", - "column_names": ["timestamp"], - "unique": False, - } - assert indexes[1] == { - "name": "clustering", - "column_names": ["integer", "string"], - "unique": False, - } + assert len(indexes) == 0 + def test_get_columns(inspector, inspector_using_test_dataset, bigquery_dataset): From 913c4fc7e813b002054821793ebdc5f45eaf5115 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 10 Dec 2023 14:16:20 +0100 Subject: [PATCH 06/31] test: alembic now supports creating partitioned tables --- tests/system/test_alembic.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tests/system/test_alembic.py b/tests/system/test_alembic.py index 1948a19a..c5422c46 100644 --- a/tests/system/test_alembic.py +++ b/tests/system/test_alembic.py @@ -138,15 +138,12 @@ def test_alembic_scenario(alembic_table): op.drop_table("accounts") assert alembic_table("accounts") is None - op.execute( - """ - create table transactions( - account INT64 NOT NULL, - transaction_time DATETIME NOT NULL, - amount NUMERIC(11, 2) NOT NULL - ) - partition by DATE(transaction_time) - """ + op.create_table( + "transactions", + Column("account", Integer, nullable=False), + Column("transaction_time", DateTime(), nullable=False), + Column("amount", Numeric(11, 2), nullable=False), + bigquery_partitioning="DATE(transaction_time)", ) op.alter_column("transactions", "amount", nullable=True) From 39bbd5662ccd4c019cd183454ee18005f5da52bf Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 11 Dec 2023 10:05:11 +0100 Subject: [PATCH 07/31] test: run integration tests with all the new create_table options --- tests/system/test_sqlalchemy_bigquery.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index b0932c61..79360746 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -539,6 +539,12 @@ def test_create_table(engine, bigquery_dataset): Column("binary_c", sqlalchemy.BINARY), bigquery_description="test table description", bigquery_friendly_name="test table name", + bigquery_expiration_timestamp=datetime.datetime(2183, 3, 26, 8, 30, 0), + bigquery_partitioning="DATE(timestamp_c)", + bigquery_partition_expiration_days=30, + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["integer_c", "decimal_c"], ) meta.create_all(engine) meta.drop_all(engine) From 743d5a4b4d301268ccf0a00df32551c049c37e9f Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 20 Dec 2023 23:47:13 +0100 Subject: [PATCH 08/31] chore: rename variables to represent what it is a bit more clearly --- sqlalchemy_bigquery/base.py | 39 ++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index d85a7b83..38dfccd2 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -657,7 +657,7 @@ def get_column_specification(self, column, **kwargs): def post_create_table(self, table): bq_opts = table.dialect_options["bigquery"] - partition_by_or_cluster_by_or_options = [] + clauses = [] if "partitioning" in bq_opts: partition_expr = bq_opts.get("partitioning") @@ -666,9 +666,7 @@ def post_create_table(self, table): partition_expr, str ), f"bigquery_partitioning dialect option accepts only {str}, provided {repr(partition_expr)}" - partition_by_or_cluster_by_or_options.append( - f"PARTITION BY {partition_expr}" - ) + clauses.append(f"PARTITION BY {partition_expr}") if "clustering_fields" in bq_opts: clustering_fields = bq_opts.get("clustering_fields") @@ -681,9 +679,7 @@ def post_create_table(self, table): if field not in table.c: raise NoSuchColumnError(field) - partition_by_or_cluster_by_or_options.append( - f"CLUSTER BY {', '.join(clustering_fields)}" - ) + clauses.append(f"CLUSTER BY {', '.join(clustering_fields)}") options = {} @@ -691,7 +687,7 @@ def post_create_table(self, table): description = bq_opts.get("description", table.comment) options["description"] = description - table_option_list = { + option_datatype_mapping = { "friendly_name": str, "expiration_timestamp": datetime.datetime, "partition_expiration_days": int, @@ -699,36 +695,35 @@ def post_create_table(self, table): "default_rounding_mode": str, } - for table_option in table_option_list: - if table_option in bq_opts: - options[table_option] = bq_opts.get(table_option) + for option in option_datatype_mapping: + if option in bq_opts: + options[option] = bq_opts.get(option) if options: def process_option_value(option, value): - if option in table_option_list: + if option in option_datatype_mapping: assert isinstance( - value, table_option_list[option] - ), f"bigquery_{option} dialect option accepts only {table_option_list[option]}, provided {repr(value)}" + value, option_datatype_mapping[option] + ), f"bigquery_{option} dialect option accepts only {option_datatype_mapping[option]}, provided {repr(value)}" else: return process_string_literal(value) - if table_option_list[option] == str: + if option_datatype_mapping[option] == str: return process_string_literal(value) - if table_option_list[option] == int: + if option_datatype_mapping[option] == int: return int(value) - if table_option_list[option] == bool: + if option_datatype_mapping[option] == bool: return "true" if value else "false" - if table_option_list[option] == datetime.datetime: + if option_datatype_mapping[option] == datetime.datetime: return BQTimestamp.process_timestamp_literal(value) - options_list = [ + individual_option_statements = [ f"{k}={process_option_value(k,v)}" for (k, v) in options.items() ] - options_str = f"OPTIONS({', '.join(options_list)})" - partition_by_or_cluster_by_or_options.append(options_str) + clauses.append(f"OPTIONS({', '.join(individual_option_statements)})") - return " " + "\n".join(partition_by_or_cluster_by_or_options) + return " " + "\n".join(clauses) def visit_set_table_comment(self, create): table_name = self.preparer.format_table(create.element) From 4fd649cb39c5dadb64e1251a14ce6c6cfb2644c4 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 00:13:03 +0100 Subject: [PATCH 09/31] fix: assertions should no be used to validate user inputs --- sqlalchemy_bigquery/base.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 38dfccd2..b655d624 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -662,18 +662,22 @@ def post_create_table(self, table): if "partitioning" in bq_opts: partition_expr = bq_opts.get("partitioning") - assert isinstance( - partition_expr, str - ), f"bigquery_partitioning dialect option accepts only {str}, provided {repr(partition_expr)}" + if not isinstance(partition_expr, str): + raise TypeError( + f"bigquery_partitioning dialect option accepts only {str}," + f"provided {repr(partition_expr)}" + ) clauses.append(f"PARTITION BY {partition_expr}") if "clustering_fields" in bq_opts: clustering_fields = bq_opts.get("clustering_fields") - assert isinstance( - clustering_fields, list - ), f"bigquery_clustering_fields dialect option accepts only {list}, provided {repr(clustering_fields)}" + if not isinstance(clustering_fields, list): + raise TypeError( + f"bigquery_clustering_fields dialect option accepts only {list}," + f"provided {repr(clustering_fields)}" + ) for field in clustering_fields: if field not in table.c: @@ -703,9 +707,11 @@ def post_create_table(self, table): def process_option_value(option, value): if option in option_datatype_mapping: - assert isinstance( - value, option_datatype_mapping[option] - ), f"bigquery_{option} dialect option accepts only {option_datatype_mapping[option]}, provided {repr(value)}" + if not isinstance(value, option_datatype_mapping[option]): + raise TypeError( + f"bigquery_{option} dialect option accepts only {option_datatype_mapping[option]}," + f"provided {repr(value)}" + ) else: return process_string_literal(value) From e9aeeddf71b6f831ca32c36f0c48beb38c2067e5 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 00:18:03 +0100 Subject: [PATCH 10/31] refactor: extract process_option_value() from post_create_table() for improved readability --- sqlalchemy_bigquery/base.py | 60 ++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index b655d624..b7c1124a 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -632,6 +632,14 @@ def visit_NUMERIC(self, type_, **kw): class BigQueryDDLCompiler(DDLCompiler): + option_datatype_mapping = { + "friendly_name": str, + "expiration_timestamp": datetime.datetime, + "partition_expiration_days": int, + "require_partition_filter": bool, + "default_rounding_mode": str, + } + # BigQuery has no support for foreign keys. def visit_foreign_key_constraint(self, constraint): return None @@ -691,41 +699,14 @@ def post_create_table(self, table): description = bq_opts.get("description", table.comment) options["description"] = description - option_datatype_mapping = { - "friendly_name": str, - "expiration_timestamp": datetime.datetime, - "partition_expiration_days": int, - "require_partition_filter": bool, - "default_rounding_mode": str, - } - - for option in option_datatype_mapping: + for option in self.option_datatype_mapping: if option in bq_opts: options[option] = bq_opts.get(option) if options: - - def process_option_value(option, value): - if option in option_datatype_mapping: - if not isinstance(value, option_datatype_mapping[option]): - raise TypeError( - f"bigquery_{option} dialect option accepts only {option_datatype_mapping[option]}," - f"provided {repr(value)}" - ) - else: - return process_string_literal(value) - - if option_datatype_mapping[option] == str: - return process_string_literal(value) - if option_datatype_mapping[option] == int: - return int(value) - if option_datatype_mapping[option] == bool: - return "true" if value else "false" - if option_datatype_mapping[option] == datetime.datetime: - return BQTimestamp.process_timestamp_literal(value) - individual_option_statements = [ - f"{k}={process_option_value(k,v)}" for (k, v) in options.items() + "{}={}".format(k, self._process_option_value(k, v)) + for (k, v) in options.items() ] clauses.append(f"OPTIONS({', '.join(individual_option_statements)})") @@ -742,6 +723,25 @@ def visit_drop_table_comment(self, drop): table_name = self.preparer.format_table(drop.element) return f"ALTER TABLE {table_name} SET OPTIONS(description=null)" + def _process_option_value(self, option, value): + if option in self.option_datatype_mapping: + if not isinstance(value, self.option_datatype_mapping[option]): + raise TypeError( + f"bigquery_{option} dialect option accepts only {self.option_datatype_mapping[option]}," + f"provided {repr(value)}" + ) + else: + return process_string_literal(value) + + if self.option_datatype_mapping[option] == str: + return process_string_literal(value) + if self.option_datatype_mapping[option] == int: + return int(value) + if self.option_datatype_mapping[option] == bool: + return "true" if value else "false" + if self.option_datatype_mapping[option] == datetime.datetime: + return BQTimestamp.process_timestamp_literal(value) + def process_string_literal(value): return repr(value.replace("%", "%%")) From bbd19ce32e8d3c04fd9c141f630e61e7f5d87273 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 00:41:48 +0100 Subject: [PATCH 11/31] docs: add docstring to post_create_table() and _process_option_value() --- sqlalchemy_bigquery/base.py | 38 ++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index b7c1124a..c63c78e8 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -663,6 +663,24 @@ def get_column_specification(self, column, **kwargs): return colspec def post_create_table(self, table): + """ + Constructs additional SQL clauses for table creation in BigQuery. + + This function processes the BigQuery dialect-specific options and generates SQL clauses for partitioning, + clustering, and other table options. + + Args: + table (Table): The SQLAlchemy Table object for which the SQL is being generated. + + Returns: + str: A string composed of SQL clauses for partitioning, clustering, and other BigQuery specific + options, each separated by a newline. Returns an empty string if no such options are specified. + + Raises: + TypeError: If the partitioning option is not a string or if the clustering_fields option is not a list. + NoSuchColumnError: If any field specified in clustering_fields does not exist in the table. + """ + bq_opts = table.dialect_options["bigquery"] clauses = [] @@ -723,7 +741,25 @@ def visit_drop_table_comment(self, drop): table_name = self.preparer.format_table(drop.element) return f"ALTER TABLE {table_name} SET OPTIONS(description=null)" - def _process_option_value(self, option, value): + def _process_option_value(self, option: str, value): + """ + Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. + + This transformation is guided by `self.option_datatype_mapping`, which maps + option to their expected data type. + + Args: + option (str): The name of the dialect option. + value: The value to be transformed. The type of this parameter depends on + the specific option being processed. + + Returns: + The processed value in a format suitable for inclusion in a SQL query. + + Raises: + TypeError: If the type of the provided value does not match the expected type as defined in + `self.option_datatype_mapping`. + """ if option in self.option_datatype_mapping: if not isinstance(value, self.option_datatype_mapping[option]): raise TypeError( From b56979fbd2047119c4be5cb18a227940eb79528b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 15:39:10 +0100 Subject: [PATCH 12/31] test: increase code coverage by testing error cases --- tests/unit/test_table_options.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 6e1b4697..7d661692 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -112,6 +112,19 @@ def test_table_clustering_fields_dialect_option(faux_conn): ) +def test_table_clustering_fields_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_clustering_fields is not a list + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields="country, town", + ) + + def test_table_partitioning_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): @@ -129,6 +142,18 @@ def test_table_partitioning_dialect_option(faux_conn): ) +def test_table_partitioning_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_partitioning is not a string + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_partitioning=["DATE(createdAt)"], + ) + + def test_table_all_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support clustering and partitioned tables with pytest.raises(sqlite3.OperationalError): From 285e32d121697d56e744c40e1186ed01fb627307 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 15:39:18 +0100 Subject: [PATCH 13/31] refactor: better represent the distinction between the option value data type check and the transformation in SQL literal --- sqlalchemy_bigquery/base.py | 54 +++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index c63c78e8..f28b0b69 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -723,8 +723,9 @@ def post_create_table(self, table): if options: individual_option_statements = [ - "{}={}".format(k, self._process_option_value(k, v)) + "{}={}".format(k, self._process_option_value(v)) for (k, v) in options.items() + if self._validate_option_value_type(k, v) ] clauses.append(f"OPTIONS({', '.join(individual_option_statements)})") @@ -741,43 +742,56 @@ def visit_drop_table_comment(self, drop): table_name = self.preparer.format_table(drop.element) return f"ALTER TABLE {table_name} SET OPTIONS(description=null)" - def _process_option_value(self, option: str, value): + def _validate_option_value_type(self, option: str, value): """ - Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. - - This transformation is guided by `self.option_datatype_mapping`, which maps - option to their expected data type. + Validates the type of the given option value against the expected data type. Args: - option (str): The name of the dialect option. - value: The value to be transformed. The type of this parameter depends on - the specific option being processed. + option (str): The name of the option to be validated. + value: The value of the dialect option whose type is to be checked. The type of this parameter + is dynamic and is verified against the expected type in `self.option_datatype_mapping`. Returns: - The processed value in a format suitable for inclusion in a SQL query. + bool: True if the type of the value matches the expected type, or if the option is not found in + `self.option_datatype_mapping`. Raises: TypeError: If the type of the provided value does not match the expected type as defined in - `self.option_datatype_mapping`. + `self.option_datatype_mapping`. """ if option in self.option_datatype_mapping: - if not isinstance(value, self.option_datatype_mapping[option]): + if type(value) is not self.option_datatype_mapping[option]: raise TypeError( f"bigquery_{option} dialect option accepts only {self.option_datatype_mapping[option]}," f"provided {repr(value)}" ) - else: - return process_string_literal(value) - if self.option_datatype_mapping[option] == str: - return process_string_literal(value) - if self.option_datatype_mapping[option] == int: - return int(value) - if self.option_datatype_mapping[option] == bool: + return True + + def _process_option_value(self, value): + """ + Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. + + Args: + value: The value to be transformed. + + Returns: + The processed value in a format suitable for inclusion in a SQL query. + + Raises: + NotImplementedError: When there is no transformation registered for a data type. + """ + if isinstance(value, bool): return "true" if value else "false" - if self.option_datatype_mapping[option] == datetime.datetime: + elif isinstance(value, int): + return int(value) + elif isinstance(value, str): + return process_string_literal(value) + elif isinstance(value, datetime.datetime): return BQTimestamp.process_timestamp_literal(value) + raise NotImplementedError(f"No transformation registered for {repr(value)}") + def process_string_literal(value): return repr(value.replace("%", "%%")) From f756cb148bee9c81c91362eed3059807e24c43dd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 16:47:02 +0100 Subject: [PATCH 14/31] test: adding test cases for _validate_option_value_type() and _process_option_value() --- tests/unit/conftest.py | 7 +++ tests/unit/test_table_options.py | 93 ++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index f808b380..6f197196 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -25,6 +25,8 @@ import pytest import sqlalchemy +from sqlalchemy_bigquery.base import BigQueryDDLCompiler, BigQueryDialect + from . import fauxdbi sqlalchemy_version = packaging.version.parse(sqlalchemy.__version__) @@ -91,6 +93,11 @@ def metadata(): return sqlalchemy.MetaData() +@pytest.fixture() +def ddl_compiler(): + return BigQueryDDLCompiler(BigQueryDialect(), None) + + def setup_table(connection, name, *columns, initial_data=(), **kw): metadata = sqlalchemy.MetaData() table = sqlalchemy.Table(name, metadata, *columns, **kw) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 7d661692..d87fe599 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -180,3 +180,96 @@ def test_table_all_dialect_option(faux_conn): " CLUSTER BY country, town" " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', partition_expiration_days=30, require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" ) + + +def test_validate_friendly_name_value_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("friendly_name", "Friendly name") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("friendly_name", 1983) + + +def test_validate_expiration_timestamp_value_type(ddl_compiler): + # expect option value to be transformed as a timestamp expression + + assert ddl_compiler._validate_option_value_type( + "expiration_timestamp", + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("expiration_timestamp", "2038-01-01") + + +def test_validate_partition_expiration_days_type(ddl_compiler): + # expect option value to be transformed as an integer + + assert ddl_compiler._validate_option_value_type("partition_expiration_days", 90) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("partition_expiration_days", "90") + + +def test_validate_require_partition_filter_type(ddl_compiler): + # expect option value to be transformed as a literal boolean + + assert ddl_compiler._validate_option_value_type("require_partition_filter", True) + assert ddl_compiler._validate_option_value_type("require_partition_filter", False) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("partition_expiration_days", "true") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("partition_expiration_days", "false") + + +def test_validate_default_rounding_mode_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type( + "default_rounding_mode", "ROUND_HALF_EVEN" + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("partition_expiration_days", True) + + +def test_validate_unmapped_option_type(ddl_compiler): + # expect option value with no typed specified in mapping to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("unknown", "DEFAULT_IS_STRING") + + +def test_process_str_option_value(ddl_compiler): + # expect string to be transformed as a string expression + assert ddl_compiler._process_option_value("Some text") == "'Some text'" + + +def test_process_datetime_value(ddl_compiler): + # expect datetime object to be transformed as a timestamp expression + assert ( + ddl_compiler._process_option_value( + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00") + ) + == "TIMESTAMP '2038-01-01 00:00:00+00:00'" + ) + + +def test_process_int_option_value(ddl_compiler): + # expect int to be unchanged + assert ddl_compiler._process_option_value(90) == 90 + + +def test_process_boolean_option_value(ddl_compiler): + # expect boolean to be transformed as a literal boolean expression + + assert ddl_compiler._process_option_value(True) == "true" + assert ddl_compiler._process_option_value(False) == "false" + + +def test_process_not_implementer_option_value(ddl_compiler): + # expect to raise + with pytest.raises(NotImplementedError): + ddl_compiler._process_option_value(float) From cc81f77655c732ce14444b26a6742fe947a1da3e Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 17:00:25 +0100 Subject: [PATCH 15/31] chore: coding style --- sqlalchemy_bigquery/base.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index f28b0b69..37ce2e01 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -781,14 +781,16 @@ def _process_option_value(self, value): Raises: NotImplementedError: When there is no transformation registered for a data type. """ - if isinstance(value, bool): - return "true" if value else "false" - elif isinstance(value, int): - return int(value) - elif isinstance(value, str): - return process_string_literal(value) - elif isinstance(value, datetime.datetime): - return BQTimestamp.process_timestamp_literal(value) + option_casting = { + # Mapping from option type to its casting method + str: lambda x: process_string_literal(x), + int: lambda x: x, + bool: lambda x: "true" if x else "false", + datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x), + } + + if (option_cast := option_casting.get(type(value))) is not None: + return option_cast(value) raise NotImplementedError(f"No transformation registered for {repr(value)}") From 1795d16c052b4c5f72a3b968889e4accdf646692 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 21 Dec 2023 18:01:49 +0100 Subject: [PATCH 16/31] chore: reformat files with black --- tests/system/test_sqlalchemy_bigquery.py | 1 - tests/unit/test_catalog_functions.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 79360746..458a5be3 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -603,7 +603,6 @@ def test_get_indexes(inspector, inspector_using_test_dataset, bigquery_dataset): assert len(indexes) == 0 - def test_get_columns(inspector, inspector_using_test_dataset, bigquery_dataset): columns_without_schema = inspector.get_columns(f"{bigquery_dataset}.sample") columns_schema = inspector.get_columns("sample", bigquery_dataset) diff --git a/tests/unit/test_catalog_functions.py b/tests/unit/test_catalog_functions.py index 56b120f4..7eab7b7b 100644 --- a/tests/unit/test_catalog_functions.py +++ b/tests/unit/test_catalog_functions.py @@ -126,8 +126,7 @@ def test_get_indexes(faux_conn): client.tables.foo.time_partitioning = TimePartitioning(field="tm") client.tables.foo.clustering_fields = ["user_email", "store_code"] - assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [ - ] + assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [] def test_no_table_pk_constraint(faux_conn): From 89cf5a905584edfb1af31711da2401f7c89ba065 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 00:17:01 +0100 Subject: [PATCH 17/31] test: typo in tests --- tests/unit/test_table_options.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index d87fe599..f46676cd 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -219,10 +219,10 @@ def test_validate_require_partition_filter_type(ddl_compiler): assert ddl_compiler._validate_option_value_type("require_partition_filter", False) with pytest.raises(TypeError): - ddl_compiler._validate_option_value_type("partition_expiration_days", "true") + ddl_compiler._validate_option_value_type("require_partition_filter", "true") with pytest.raises(TypeError): - ddl_compiler._validate_option_value_type("partition_expiration_days", "false") + ddl_compiler._validate_option_value_type("require_partition_filter", "false") def test_validate_default_rounding_mode_type(ddl_compiler): @@ -233,7 +233,7 @@ def test_validate_default_rounding_mode_type(ddl_compiler): ) with pytest.raises(TypeError): - ddl_compiler._validate_option_value_type("partition_expiration_days", True) + ddl_compiler._validate_option_value_type("default_rounding_mode", True) def test_validate_unmapped_option_type(ddl_compiler): From 1944f95afd843dad2c8d4a2808e031a1f3fdb2ac Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 00:21:14 +0100 Subject: [PATCH 18/31] feat: change the option name for partitioning to leverage the TimePartitioning interface of the Python Client for Google BigQuery --- sqlalchemy_bigquery/base.py | 92 ++++++++++++++++++++------- tests/unit/test_table_options.py | 104 +++++++++++++++++++++---------- 2 files changed, 141 insertions(+), 55 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 37ce2e01..db26e68f 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -28,7 +28,10 @@ from google import auth import google.api_core.exceptions from google.cloud.bigquery import dbapi -from google.cloud.bigquery.table import TableReference +from google.cloud.bigquery.table import ( + TableReference, + TimePartitioning, +) from google.api_core.exceptions import NotFound import packaging.version import sqlalchemy @@ -635,7 +638,6 @@ class BigQueryDDLCompiler(DDLCompiler): option_datatype_mapping = { "friendly_name": str, "expiration_timestamp": datetime.datetime, - "partition_expiration_days": int, "require_partition_filter": bool, "default_rounding_mode": str, } @@ -673,37 +675,44 @@ def post_create_table(self, table): table (Table): The SQLAlchemy Table object for which the SQL is being generated. Returns: - str: A string composed of SQL clauses for partitioning, clustering, and other BigQuery specific + str: A string composed of SQL clauses for time partitioning, clustering, and other BigQuery specific options, each separated by a newline. Returns an empty string if no such options are specified. Raises: - TypeError: If the partitioning option is not a string or if the clustering_fields option is not a list. + TypeError: If the time_partitioning option is not a `TimePartitioning` object or if the clustering_fields option is not a list. NoSuchColumnError: If any field specified in clustering_fields does not exist in the table. """ bq_opts = table.dialect_options["bigquery"] + options = {} clauses = [] - if "partitioning" in bq_opts: - partition_expr = bq_opts.get("partitioning") + if "time_partitioning" in bq_opts: + time_partitioning: TimePartitioning = bq_opts.get("time_partitioning") + + self._raise_for_type( + "time_partitioning", + time_partitioning, + TimePartitioning, + ) - if not isinstance(partition_expr, str): - raise TypeError( - f"bigquery_partitioning dialect option accepts only {str}," - f"provided {repr(partition_expr)}" + if time_partitioning.expiration_ms: + options["partition_expiration_days"] = ( + time_partitioning.expiration_ms / 86400000 ) - clauses.append(f"PARTITION BY {partition_expr}") + partition_by_clause = self._process_time_partitioning( + table, + time_partitioning, + ) + + clauses.append(partition_by_clause) if "clustering_fields" in bq_opts: clustering_fields = bq_opts.get("clustering_fields") - if not isinstance(clustering_fields, list): - raise TypeError( - f"bigquery_clustering_fields dialect option accepts only {list}," - f"provided {repr(clustering_fields)}" - ) + self._raise_for_type("clustering_fields", clustering_fields, list) for field in clustering_fields: if field not in table.c: @@ -711,10 +720,9 @@ def post_create_table(self, table): clauses.append(f"CLUSTER BY {', '.join(clustering_fields)}") - options = {} - if ("description" in bq_opts) or table.comment: description = bq_opts.get("description", table.comment) + self._validate_option_value_type("description", description) options["description"] = description for option in self.option_datatype_mapping: @@ -760,14 +768,51 @@ def _validate_option_value_type(self, option: str, value): `self.option_datatype_mapping`. """ if option in self.option_datatype_mapping: - if type(value) is not self.option_datatype_mapping[option]: - raise TypeError( - f"bigquery_{option} dialect option accepts only {self.option_datatype_mapping[option]}," - f"provided {repr(value)}" - ) + self._raise_for_type( + option, + value, + self.option_datatype_mapping[option], + ) return True + def _raise_for_type(self, option, value, expected_type): + if type(value) is not expected_type: + raise TypeError( + f"bigquery_{option} dialect option accepts only {expected_type}," + f"provided {repr(value)}" + ) + + def _process_time_partitioning( + self, table: Table, time_partitioning: TimePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. + + Parameters: + - table (Table): The SQLAlchemy table to be partitioned. + - time_partitioning (TimePartitioning): The time partitioning details, + including the field to be used for partitioning. + + Returns: + - str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to + partition data on the specified field. + + Example: + - Given a table with a TIMESTAMP type column 'event_timestamp' and setting + 'time_partitioning.field' to 'event_timestamp', the function returns + "PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)". + """ + if isinstance( + table.columns[time_partitioning.field].type, + sqlalchemy.sql.sqltypes.TIMESTAMP, + ): + trunc_fn = "TIMESTAMP_TRUNC" + else: + trunc_fn = "DATE_TRUNC" + + return f"PARTITION BY {trunc_fn}({time_partitioning.field}, {time_partitioning.type_})" + def _process_option_value(self, value): """ Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. @@ -785,6 +830,7 @@ def _process_option_value(self, value): # Mapping from option type to its casting method str: lambda x: process_string_literal(x), int: lambda x: x, + float: lambda x: x, bool: lambda x: "true" if x else "false", datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x), } diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index f46676cd..b075a9a4 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -22,6 +22,8 @@ import pytest import sqlalchemy +from google.cloud.bigquery import TimePartitioning, TimePartitioningType + from .conftest import setup_table @@ -41,20 +43,6 @@ def test_table_expiration_timestamp_dialect_option(faux_conn): ) -def test_table_partition_expiration_days_dialect_option(faux_conn): - setup_table( - faux_conn, - "some_table", - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_partition_expiration_days=30, - ) - - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `createdAt` DATETIME )" - " OPTIONS(partition_expiration_days=30)" - ) - - def test_table_require_partition_filter_dialect_option(faux_conn): setup_table( faux_conn, @@ -125,7 +113,24 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn): ) -def test_table_partitioning_dialect_option(faux_conn): +def test_table_time_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt", type_="DAY"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_by_month_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): setup_table( @@ -133,24 +138,65 @@ def test_table_partitioning_dialect_option(faux_conn): "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_partitioning="DATE(createdAt)", + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_=TimePartitioningType.MONTH, + ), ) assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" - " PARTITION BY DATE(createdAt)" + " PARTITION BY DATE_TRUNC(createdAt, MONTH)" + ) + + +def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP), + bigquery_time_partitioning=TimePartitioning(field="createdAt", type_="DAY"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )" + " PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=21600000, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(partition_expiration_days=0.25)" ) def test_table_partitioning_dialect_option_type_error(faux_conn): - # expect TypeError when bigquery_partitioning is not a string + # expect TypeError when bigquery_time_partitioning is not a TimePartitioning object with pytest.raises(TypeError): setup_table( faux_conn, "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_partitioning=["DATE(createdAt)"], + bigquery_time_partitioning="DATE(createdAt)", ) @@ -167,18 +213,21 @@ def test_table_all_dialect_option(faux_conn): bigquery_expiration_timestamp=datetime.datetime.fromisoformat( "2038-01-01T00:00:00+00:00" ), - bigquery_partition_expiration_days=30, bigquery_require_partition_filter=True, bigquery_default_rounding_mode="ROUND_HALF_EVEN", bigquery_clustering_fields=["country", "town"], - bigquery_partitioning="DATE(createdAt)", + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=2592000000, + ), ) assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )" - " PARTITION BY DATE(createdAt)" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" " CLUSTER BY country, town" - " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', partition_expiration_days=30, require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" + " OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" ) @@ -203,15 +252,6 @@ def test_validate_expiration_timestamp_value_type(ddl_compiler): ddl_compiler._validate_option_value_type("expiration_timestamp", "2038-01-01") -def test_validate_partition_expiration_days_type(ddl_compiler): - # expect option value to be transformed as an integer - - assert ddl_compiler._validate_option_value_type("partition_expiration_days", 90) - - with pytest.raises(TypeError): - ddl_compiler._validate_option_value_type("partition_expiration_days", "90") - - def test_validate_require_partition_filter_type(ddl_compiler): # expect option value to be transformed as a literal boolean From 056b49b8c992c4a8fb8aaddb8657b4ad21c368e1 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 00:39:35 +0100 Subject: [PATCH 19/31] fix: TimePartitioning.field is optional --- sqlalchemy_bigquery/base.py | 21 ++++++++++++--------- tests/unit/test_table_options.py | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index db26e68f..ec0a0a3a 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -803,15 +803,18 @@ def _process_time_partitioning( 'time_partitioning.field' to 'event_timestamp', the function returns "PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)". """ - if isinstance( - table.columns[time_partitioning.field].type, - sqlalchemy.sql.sqltypes.TIMESTAMP, - ): - trunc_fn = "TIMESTAMP_TRUNC" - else: - trunc_fn = "DATE_TRUNC" - - return f"PARTITION BY {trunc_fn}({time_partitioning.field}, {time_partitioning.type_})" + field = "_PARTITIONDATE" + trunc_fn = "DATE_TRUNC" + + if time_partitioning.field is not None: + field = time_partitioning.field + if isinstance( + table.columns[time_partitioning.field].type, + sqlalchemy.sql.sqltypes.TIMESTAMP, + ): + trunc_fn = "TIMESTAMP_TRUNC" + + return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})" def _process_option_value(self, value): """ diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index b075a9a4..20089371 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -114,6 +114,23 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn): def test_table_time_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)" + ) + + +def test_table_time_partitioning_with_field_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): setup_table( From 2ae57e51367b1283a217adc5af5016e63e198342 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 01:07:09 +0100 Subject: [PATCH 20/31] chore: coding style --- sqlalchemy_bigquery/base.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index ec0a0a3a..8d7080fd 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -688,9 +688,7 @@ def post_create_table(self, table): options = {} clauses = [] - if "time_partitioning" in bq_opts: - time_partitioning: TimePartitioning = bq_opts.get("time_partitioning") - + if (time_partitioning := bq_opts.get("time_partitioning")) is not None: self._raise_for_type( "time_partitioning", time_partitioning, @@ -709,9 +707,7 @@ def post_create_table(self, table): clauses.append(partition_by_clause) - if "clustering_fields" in bq_opts: - clustering_fields = bq_opts.get("clustering_fields") - + if (clustering_fields := bq_opts.get("clustering_fields")) is not None: self._raise_for_type("clustering_fields", clustering_fields, list) for field in clustering_fields: From 9ab5acba546368b45cf51c59c24450721d785e3d Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 01:44:44 +0100 Subject: [PATCH 21/31] test: fix system test with table option bigquery_require_partition_filter --- tests/unit/test_table_options.py | 36 ++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 20089371..f875b5d9 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -43,20 +43,6 @@ def test_table_expiration_timestamp_dialect_option(faux_conn): ) -def test_table_require_partition_filter_dialect_option(faux_conn): - setup_table( - faux_conn, - "some_table", - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_require_partition_filter=True, - ) - - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `createdAt` DATETIME )" - " OPTIONS(require_partition_filter=true)" - ) - - def test_table_default_rounding_mode_dialect_option(faux_conn): setup_table( faux_conn, @@ -130,6 +116,24 @@ def test_table_time_partitioning_dialect_option(faux_conn): ) +def test_table_require_partition_filter_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + bigquery_require_partition_filter=True, + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(require_partition_filter=true)" + ) + + def test_table_time_partitioning_with_field_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): @@ -138,7 +142,7 @@ def test_table_time_partitioning_with_field_dialect_option(faux_conn): "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_time_partitioning=TimePartitioning(field="createdAt", type_="DAY"), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), ) assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( @@ -175,7 +179,7 @@ def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn): "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP), - bigquery_time_partitioning=TimePartitioning(field="createdAt", type_="DAY"), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), ) assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( From 3c692367e8c4618b4510c4e1705aa6cf8618c699 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 01:59:57 +0100 Subject: [PATCH 22/31] feat: add support for experimental range_partitioning option --- sqlalchemy_bigquery/base.py | 76 ++++++++++++++++++++- tests/unit/test_table_options.py | 112 ++++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 3 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 8d7080fd..585ce071 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -29,6 +29,7 @@ import google.api_core.exceptions from google.cloud.bigquery import dbapi from google.cloud.bigquery.table import ( + RangePartitioning, TableReference, TimePartitioning, ) @@ -688,6 +689,15 @@ def post_create_table(self, table): options = {} clauses = [] + if ( + bq_opts.get("time_partitioning") is not None + and bq_opts.get("range_partitioning") is not None + ): + raise ValueError( + "biquery_time_partitioning and bigquery_range_partitioning" + " dialect options are mutually exclusive." + ) + if (time_partitioning := bq_opts.get("time_partitioning")) is not None: self._raise_for_type( "time_partitioning", @@ -707,6 +717,20 @@ def post_create_table(self, table): clauses.append(partition_by_clause) + if (range_partitioning := bq_opts.get("range_partitioning")) is not None: + self._raise_for_type( + "range_partitioning", + range_partitioning, + RangePartitioning, + ) + + partition_by_clause = self._process_range_partitioning( + table, + range_partitioning, + ) + + clauses.append(partition_by_clause) + if (clustering_fields := bq_opts.get("clustering_fields")) is not None: self._raise_for_type("clustering_fields", clustering_fields, list) @@ -776,7 +800,7 @@ def _raise_for_type(self, option, value, expected_type): if type(value) is not expected_type: raise TypeError( f"bigquery_{option} dialect option accepts only {expected_type}," - f"provided {repr(value)}" + f" provided {repr(value)}" ) def _process_time_partitioning( @@ -786,7 +810,7 @@ def _process_time_partitioning( Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. Parameters: - - table (Table): The SQLAlchemy table to be partitioned. + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. - time_partitioning (TimePartitioning): The time partitioning details, including the field to be used for partitioning. @@ -812,6 +836,54 @@ def _process_time_partitioning( return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})" + def _process_range_partitioning( + self, table: Table, range_partitioning: RangePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a range of integers. + + Parameters: + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - range_partitioning (RangePartitioning): The RangePartitioning object containing the + partitioning field, range start, range end, and interval. + + Returns: + - str: A SQL string for range partitioning using RANGE_BUCKET and GENERATE_ARRAY functions. + + Raises: + - ValueError: If the partitioning field is not defined, not an integer type, or if the range + start/end values are not integers. + + Example: + "PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 10))" + """ + if range_partitioning.field is None: + raise ValueError("bigquery_range_partitioning expects field to be defined") + + if not isinstance( + table.columns[range_partitioning.field].type, + sqlalchemy.sql.sqltypes.INT, + ): + raise ValueError( + "bigquery_range_partitioning expects field data type to be INTEGER" + ) + + range_ = range_partitioning.range_ + + if not isinstance(range_.start, int): + raise ValueError( + "bigquery_range_partitioning expects range_.start to be an int," + f" provided {repr(range_.start)}" + ) + + if not isinstance(range_.end, int): + raise ValueError( + "bigquery_range_partitioning expects range_.end to be an int," + f" provided {repr(range_.end)}" + ) + + return f"PARTITION BY RANGE_BUCKET({range_partitioning.field}, GENERATE_ARRAY({range_.start}, {range_.end}, {range_.interval or 1}))" + def _process_option_value(self, value): """ Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index f875b5d9..5a9a7b24 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -22,7 +22,12 @@ import pytest import sqlalchemy -from google.cloud.bigquery import TimePartitioning, TimePartitioningType +from google.cloud.bigquery import ( + PartitionRange, + RangePartitioning, + TimePartitioning, + TimePartitioningType, +) from .conftest import setup_table @@ -221,6 +226,111 @@ def test_table_partitioning_dialect_option_type_error(faux_conn): ) +def test_table_range_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=2, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 2))" + ) + + +def test_table_range_partitioning_dialect_option_no_field(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises( + ValueError, + match="bigquery_range_partitioning expects field to be defined", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises( + ValueError, + match="bigquery_range_partitioning expects field data type to be INTEGER", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_range_missing(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises( + ValueError, + match="bigquery_range_partitioning expects range_.start to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning(field="zipcode"), + ) + + +def test_table_range_partitioning_dialect_option_default_interval(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 1))" + ) + + def test_table_all_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support clustering and partitioned tables with pytest.raises(sqlite3.OperationalError): From f039b17fbca28db493357add9aaf2e6242726abd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 11:41:48 +0100 Subject: [PATCH 23/31] test: fix system test with new bigquery_time_partitioning table option --- tests/system/test_alembic.py | 4 ++-- tests/system/test_sqlalchemy_bigquery.py | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/tests/system/test_alembic.py b/tests/system/test_alembic.py index c5422c46..30308c68 100644 --- a/tests/system/test_alembic.py +++ b/tests/system/test_alembic.py @@ -23,7 +23,7 @@ from sqlalchemy import Column, DateTime, Integer, String, Numeric import google.api_core.exceptions -from google.cloud.bigquery import SchemaField +from google.cloud.bigquery import SchemaField, TimePartitioning alembic = pytest.importorskip("alembic") @@ -143,7 +143,7 @@ def test_alembic_scenario(alembic_table): Column("account", Integer, nullable=False), Column("transaction_time", DateTime(), nullable=False), Column("amount", Numeric(11, 2), nullable=False), - bigquery_partitioning="DATE(transaction_time)", + bigquery_time_partitioning=TimePartitioning(field="transaction_time"), ) op.alter_column("transactions", "amount", nullable=True) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 458a5be3..9d072b15 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -22,6 +22,8 @@ import datetime import decimal +from google.cloud.bigquery import TimePartitioning + from sqlalchemy.engine import create_engine from sqlalchemy.schema import Table, MetaData, Column from sqlalchemy.ext.declarative import declarative_base @@ -540,8 +542,10 @@ def test_create_table(engine, bigquery_dataset): bigquery_description="test table description", bigquery_friendly_name="test table name", bigquery_expiration_timestamp=datetime.datetime(2183, 3, 26, 8, 30, 0), - bigquery_partitioning="DATE(timestamp_c)", - bigquery_partition_expiration_days=30, + bigquery_time_partitioning=TimePartitioning( + field="timestamp_c", + expiration_ms=1000 * 60 * 60 * 24 * 30, + ), bigquery_require_partition_filter=True, bigquery_default_rounding_mode="ROUND_HALF_EVEN", bigquery_clustering_fields=["integer_c", "decimal_c"], From 27992e46742daa10c705251763b3421f1ed8d715 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Fri, 22 Dec 2023 11:47:20 +0100 Subject: [PATCH 24/31] docs: update README with time_partitioning and range_partitioning --- README.rst | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/README.rst b/README.rst index b014bea3..17534886 100644 --- a/README.rst +++ b/README.rst @@ -311,13 +311,43 @@ To create a clustered table: table = Table('mytable', ..., bigquery_clustering_fields=["a", "b", "c"]) -To create a partitioned table: +To create a time-unit column-partitioned table: .. code-block:: python + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning( + field="mytimestamp", + type_="MONTH", + expiration_ms=1000 * 60 * 60 * 24 * 30 * 6, # 6 months + ), + bigquery_require_partition_filter=True, + ) + +To create an ingestion-time partitioned table: + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning(), + bigquery_require_partition_filter=True, + ) + +To create an integer-range partitioned table + +.. code-block:: python + + from google.cloud import bigquery + table = Table('mytable', ..., - bigquery_partitioning="DATE(mytimestamp)", - bigquery_partition_expiration_days=90, + bigquery_range_partitioning=bigquery.RangePartitioning( + field="zipcode", + range_=bigquery.PartitionRange(start=0, end=100000, interval=10), + ), bigquery_require_partition_filter=True, ) From 6fc0354356e843d4ee885a82ec822839d6af0397 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 4 Jan 2024 17:59:11 +0100 Subject: [PATCH 25/31] test: relevant comments in unit tests --- tests/unit/test_table_options.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 5a9a7b24..45512a2a 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -251,7 +251,7 @@ def test_table_range_partitioning_dialect_option(faux_conn): def test_table_range_partitioning_dialect_option_no_field(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables + # expect TypeError when bigquery_range_partitioning field is not defined with pytest.raises( ValueError, match="bigquery_range_partitioning expects field to be defined", @@ -272,7 +272,7 @@ def test_table_range_partitioning_dialect_option_no_field(faux_conn): def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables + # expect ValueError when bigquery_range_partitioning field is not an INTEGER with pytest.raises( ValueError, match="bigquery_range_partitioning expects field data type to be INTEGER", @@ -294,7 +294,7 @@ def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): def test_table_range_partitioning_dialect_option_range_missing(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables + # expect ValueError when bigquery_range_partitioning range start or end is missing with pytest.raises( ValueError, match="bigquery_range_partitioning expects range_.start to be an int, provided None", From 71531a75e81e90d333bf63db64e44d3e0ffa7d80 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Thu, 4 Jan 2024 18:02:54 +0100 Subject: [PATCH 26/31] test: cover all error cases --- tests/unit/test_table_options.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 45512a2a..808b918e 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -307,6 +307,21 @@ def test_table_range_partitioning_dialect_option_range_missing(faux_conn): bigquery_range_partitioning=RangePartitioning(field="zipcode"), ) + with pytest.raises( + ValueError, + match="bigquery_range_partitioning expects range_.end to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange(start=1), + ), + ) + def test_table_range_partitioning_dialect_option_default_interval(faux_conn): # expect table creation to fail as SQLite does not support partitioned tables @@ -331,6 +346,19 @@ def test_table_range_partitioning_dialect_option_default_interval(faux_conn): ) +def test_time_and_range_partitioning_mutually_exclusive(faux_conn): + # expect ValueError when both bigquery_time_partitioning and bigquery_range_partitioning are provided + with pytest.raises(ValueError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_range_partitioning=RangePartitioning(), + bigquery_time_partitioning=TimePartitioning(), + ) + + def test_table_all_dialect_option(faux_conn): # expect table creation to fail as SQLite does not support clustering and partitioned tables with pytest.raises(sqlite3.OperationalError): From 995d1e5d7a407b9a425e859e88ebf660ea97c352 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 9 Jan 2024 16:41:14 +0100 Subject: [PATCH 27/31] chore: no magic numbers --- sqlalchemy_bigquery/base.py | 3 ++- tests/system/test_sqlalchemy_bigquery.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 585ce071..1c86a95f 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -706,8 +706,9 @@ def post_create_table(self, table): ) if time_partitioning.expiration_ms: + _24hours = 1000 * 60 * 60 * 24 options["partition_expiration_days"] = ( - time_partitioning.expiration_ms / 86400000 + time_partitioning.expiration_ms / _24hours ) partition_by_clause = self._process_time_partitioning( diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 9d072b15..cccbd4bb 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -544,7 +544,7 @@ def test_create_table(engine, bigquery_dataset): bigquery_expiration_timestamp=datetime.datetime(2183, 3, 26, 8, 30, 0), bigquery_time_partitioning=TimePartitioning( field="timestamp_c", - expiration_ms=1000 * 60 * 60 * 24 * 30, + expiration_ms=1000 * 60 * 60 * 24 * 30, # 30 days ), bigquery_require_partition_filter=True, bigquery_default_rounding_mode="ROUND_HALF_EVEN", From a9b8d2732b71b5e41d7f663cf2c0b8ec1674ddcb Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 9 Jan 2024 16:41:40 +0100 Subject: [PATCH 28/31] chore: consistency in docstrings --- sqlalchemy_bigquery/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 1c86a95f..5039946e 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -810,7 +810,7 @@ def _process_time_partitioning( """ Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. - Parameters: + Args: - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. - time_partitioning (TimePartitioning): The time partitioning details, including the field to be used for partitioning. @@ -843,7 +843,7 @@ def _process_range_partitioning( """ Generates a SQL 'PARTITION BY' clause for partitioning a table by a range of integers. - Parameters: + Args: - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. - range_partitioning (RangePartitioning): The RangePartitioning object containing the partitioning field, range start, range end, and interval. From 37c1eb082ef61f6c2701debe5e5f7b6bb073db00 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 9 Jan 2024 17:34:15 +0100 Subject: [PATCH 29/31] chore: no magic number --- sqlalchemy_bigquery/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 5039946e..8e389390 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -883,7 +883,9 @@ def _process_range_partitioning( f" provided {repr(range_.end)}" ) - return f"PARTITION BY RANGE_BUCKET({range_partitioning.field}, GENERATE_ARRAY({range_.start}, {range_.end}, {range_.interval or 1}))" + default_interval = 1 + + return f"PARTITION BY RANGE_BUCKET({range_partitioning.field}, GENERATE_ARRAY({range_.start}, {range_.end}, {range_.interval or default_interval}))" def _process_option_value(self, value): """ From badece47590170680441ac4f2020e67d22e26aa8 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Tue, 9 Jan 2024 18:26:48 +0100 Subject: [PATCH 30/31] chore: better error types --- sqlalchemy_bigquery/base.py | 15 +++++++++------ tests/unit/test_table_options.py | 10 +++++----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 8e389390..f4266f13 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -852,33 +852,36 @@ def _process_range_partitioning( - str: A SQL string for range partitioning using RANGE_BUCKET and GENERATE_ARRAY functions. Raises: - - ValueError: If the partitioning field is not defined, not an integer type, or if the range - start/end values are not integers. + - AttributeError: If the partitioning field is not defined. + - ValueError: If the partitioning field (i.e. column) data type is not an integer. + - TypeError: If the partitioning range start/end values are not integers. Example: "PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 10))" """ if range_partitioning.field is None: - raise ValueError("bigquery_range_partitioning expects field to be defined") + raise AttributeError( + "bigquery_range_partitioning expects field to be defined" + ) if not isinstance( table.columns[range_partitioning.field].type, sqlalchemy.sql.sqltypes.INT, ): raise ValueError( - "bigquery_range_partitioning expects field data type to be INTEGER" + "bigquery_range_partitioning expects field (i.e. column) data type to be INTEGER" ) range_ = range_partitioning.range_ if not isinstance(range_.start, int): - raise ValueError( + raise TypeError( "bigquery_range_partitioning expects range_.start to be an int," f" provided {repr(range_.start)}" ) if not isinstance(range_.end, int): - raise ValueError( + raise TypeError( "bigquery_range_partitioning expects range_.end to be an int," f" provided {repr(range_.end)}" ) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 808b918e..9ff8c3cd 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -253,7 +253,7 @@ def test_table_range_partitioning_dialect_option(faux_conn): def test_table_range_partitioning_dialect_option_no_field(faux_conn): # expect TypeError when bigquery_range_partitioning field is not defined with pytest.raises( - ValueError, + AttributeError, match="bigquery_range_partitioning expects field to be defined", ): setup_table( @@ -275,7 +275,7 @@ def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): # expect ValueError when bigquery_range_partitioning field is not an INTEGER with pytest.raises( ValueError, - match="bigquery_range_partitioning expects field data type to be INTEGER", + match="bigquery_range_partitioning expects field \(i\.e\. column\) data type to be INTEGER", ): setup_table( faux_conn, @@ -294,9 +294,9 @@ def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): def test_table_range_partitioning_dialect_option_range_missing(faux_conn): - # expect ValueError when bigquery_range_partitioning range start or end is missing + # expect TypeError when bigquery_range_partitioning range start or end is missing with pytest.raises( - ValueError, + TypeError, match="bigquery_range_partitioning expects range_.start to be an int, provided None", ): setup_table( @@ -308,7 +308,7 @@ def test_table_range_partitioning_dialect_option_range_missing(faux_conn): ) with pytest.raises( - ValueError, + TypeError, match="bigquery_range_partitioning expects range_.end to be an int, provided None", ): setup_table( From 8184c38fd9a5e4e16e5398e54335dd1df2b44180 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Wed, 10 Jan 2024 00:32:30 +0100 Subject: [PATCH 31/31] chore: fix W605 invalid escape sequence --- tests/unit/test_table_options.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 9ff8c3cd..2147fb1d 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -275,7 +275,7 @@ def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): # expect ValueError when bigquery_range_partitioning field is not an INTEGER with pytest.raises( ValueError, - match="bigquery_range_partitioning expects field \(i\.e\. column\) data type to be INTEGER", + match=r"bigquery_range_partitioning expects field \(i\.e\. column\) data type to be INTEGER", ): setup_table( faux_conn,