diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 40bf9973..e4e943e0 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -1,4 +1,4 @@ -# Copyright 2023 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:230f7fe8a0d2ed81a519cfc15c6bb11c5b46b9fb449b8b1219b3771bcb520ad2 -# created: 2023-12-09T15:16:25.430769578Z + digest: sha256:98f3afd11308259de6e828e37376d18867fd321aba07826e29e4f8d9cab56bad +# created: 2024-02-27T15:56:18.442440378Z diff --git a/.github/sync-repo-settings.yaml b/.github/sync-repo-settings.yaml index 68af1253..38bd545c 100644 --- a/.github/sync-repo-settings.yaml +++ b/.github/sync-repo-settings.yaml @@ -14,6 +14,8 @@ branchProtectionRules: - 'Samples - Python 3.8' - 'Samples - Python 3.9' - 'Samples - Python 3.10' + - 'Samples - Python 3.11' + - 'Samples - Python 3.12' permissionRules: - team: actools-python permission: admin diff --git a/.kokoro/requirements.txt b/.kokoro/requirements.txt index e5c1ffca..bda8e38c 100644 --- a/.kokoro/requirements.txt +++ b/.kokoro/requirements.txt @@ -93,30 +93,39 @@ colorlog==6.7.0 \ # via # gcp-docuploader # nox -cryptography==41.0.6 \ - --hash=sha256:068bc551698c234742c40049e46840843f3d98ad7ce265fd2bd4ec0d11306596 \ - --hash=sha256:0f27acb55a4e77b9be8d550d762b0513ef3fc658cd3eb15110ebbcbd626db12c \ - --hash=sha256:2132d5865eea673fe6712c2ed5fb4fa49dba10768bb4cc798345748380ee3660 \ - --hash=sha256:3288acccef021e3c3c10d58933f44e8602cf04dba96d9796d70d537bb2f4bbc4 \ - --hash=sha256:35f3f288e83c3f6f10752467c48919a7a94b7d88cc00b0668372a0d2ad4f8ead \ - --hash=sha256:398ae1fc711b5eb78e977daa3cbf47cec20f2c08c5da129b7a296055fbb22aed \ - --hash=sha256:422e3e31d63743855e43e5a6fcc8b4acab860f560f9321b0ee6269cc7ed70cc3 \ - --hash=sha256:48783b7e2bef51224020efb61b42704207dde583d7e371ef8fc2a5fb6c0aabc7 \ - --hash=sha256:4d03186af98b1c01a4eda396b137f29e4e3fb0173e30f885e27acec8823c1b09 \ - --hash=sha256:5daeb18e7886a358064a68dbcaf441c036cbdb7da52ae744e7b9207b04d3908c \ - --hash=sha256:60e746b11b937911dc70d164060d28d273e31853bb359e2b2033c9e93e6f3c43 \ - --hash=sha256:742ae5e9a2310e9dade7932f9576606836ed174da3c7d26bc3d3ab4bd49b9f65 \ - --hash=sha256:7e00fb556bda398b99b0da289ce7053639d33b572847181d6483ad89835115f6 \ - --hash=sha256:85abd057699b98fce40b41737afb234fef05c67e116f6f3650782c10862c43da \ - --hash=sha256:8efb2af8d4ba9dbc9c9dd8f04d19a7abb5b49eab1f3694e7b5a16a5fc2856f5c \ - --hash=sha256:ae236bb8760c1e55b7a39b6d4d32d2279bc6c7c8500b7d5a13b6fb9fc97be35b \ - --hash=sha256:afda76d84b053923c27ede5edc1ed7d53e3c9f475ebaf63c68e69f1403c405a8 \ - --hash=sha256:b27a7fd4229abef715e064269d98a7e2909ebf92eb6912a9603c7e14c181928c \ - --hash=sha256:b648fe2a45e426aaee684ddca2632f62ec4613ef362f4d681a9a6283d10e079d \ - --hash=sha256:c5a550dc7a3b50b116323e3d376241829fd326ac47bc195e04eb33a8170902a9 \ - --hash=sha256:da46e2b5df770070412c46f87bac0849b8d685c5f2679771de277a422c7d0b86 \ - --hash=sha256:f39812f70fc5c71a15aa3c97b2bbe213c3f2a460b79bd21c40d033bb34a9bf36 \ - --hash=sha256:ff369dd19e8fe0528b02e8df9f2aeb2479f89b1270d90f96a63500afe9af5cae +cryptography==42.0.4 \ + --hash=sha256:01911714117642a3f1792c7f376db572aadadbafcd8d75bb527166009c9f1d1b \ + --hash=sha256:0e89f7b84f421c56e7ff69f11c441ebda73b8a8e6488d322ef71746224c20fce \ + --hash=sha256:12d341bd42cdb7d4937b0cabbdf2a94f949413ac4504904d0cdbdce4a22cbf88 \ + --hash=sha256:15a1fb843c48b4a604663fa30af60818cd28f895572386e5f9b8a665874c26e7 \ + --hash=sha256:1cdcdbd117681c88d717437ada72bdd5be9de117f96e3f4d50dab3f59fd9ab20 \ + --hash=sha256:1df6fcbf60560d2113b5ed90f072dc0b108d64750d4cbd46a21ec882c7aefce9 \ + --hash=sha256:3c6048f217533d89f2f8f4f0fe3044bf0b2090453b7b73d0b77db47b80af8dff \ + --hash=sha256:3e970a2119507d0b104f0a8e281521ad28fc26f2820687b3436b8c9a5fcf20d1 \ + --hash=sha256:44a64043f743485925d3bcac548d05df0f9bb445c5fcca6681889c7c3ab12764 \ + --hash=sha256:4e36685cb634af55e0677d435d425043967ac2f3790ec652b2b88ad03b85c27b \ + --hash=sha256:5f8907fcf57392cd917892ae83708761c6ff3c37a8e835d7246ff0ad251d9298 \ + --hash=sha256:69b22ab6506a3fe483d67d1ed878e1602bdd5912a134e6202c1ec672233241c1 \ + --hash=sha256:6bfadd884e7280df24d26f2186e4e07556a05d37393b0f220a840b083dc6a824 \ + --hash=sha256:6d0fbe73728c44ca3a241eff9aefe6496ab2656d6e7a4ea2459865f2e8613257 \ + --hash=sha256:6ffb03d419edcab93b4b19c22ee80c007fb2d708429cecebf1dd3258956a563a \ + --hash=sha256:810bcf151caefc03e51a3d61e53335cd5c7316c0a105cc695f0959f2c638b129 \ + --hash=sha256:831a4b37accef30cccd34fcb916a5d7b5be3cbbe27268a02832c3e450aea39cb \ + --hash=sha256:887623fe0d70f48ab3f5e4dbf234986b1329a64c066d719432d0698522749929 \ + --hash=sha256:a0298bdc6e98ca21382afe914c642620370ce0470a01e1bef6dd9b5354c36854 \ + --hash=sha256:a1327f280c824ff7885bdeef8578f74690e9079267c1c8bd7dc5cc5aa065ae52 \ + --hash=sha256:c1f25b252d2c87088abc8bbc4f1ecbf7c919e05508a7e8628e6875c40bc70923 \ + --hash=sha256:c3a5cbc620e1e17009f30dd34cb0d85c987afd21c41a74352d1719be33380885 \ + --hash=sha256:ce8613beaffc7c14f091497346ef117c1798c202b01153a8cc7b8e2ebaaf41c0 \ + --hash=sha256:d2a27aca5597c8a71abbe10209184e1a8e91c1fd470b5070a2ea60cafec35bcd \ + --hash=sha256:dad9c385ba8ee025bb0d856714f71d7840020fe176ae0229de618f14dae7a6e2 \ + --hash=sha256:db4b65b02f59035037fde0998974d84244a64c3265bdef32a827ab9b63d61b18 \ + --hash=sha256:e09469a2cec88fb7b078e16d4adec594414397e8879a4341c6ace96013463d5b \ + --hash=sha256:e53dc41cda40b248ebc40b83b31516487f7db95ab8ceac1f042626bc43a2f992 \ + --hash=sha256:f1e85a178384bf19e36779d91ff35c7617c885da487d689b05c1366f9933ad74 \ + --hash=sha256:f47be41843200f7faec0683ad751e5ef11b9a56a220d57f300376cd8aba81660 \ + --hash=sha256:fb0cef872d8193e487fc6bdb08559c3aa41b659a7d9be48b2e10747f47863925 \ + --hash=sha256:ffc73996c4fca3d2b6c1c8c12bfd3ad00def8621da24f547626bf06441400449 # via # gcp-releasetool # secretstorage @@ -263,9 +272,9 @@ jeepney==0.8.0 \ # via # keyring # secretstorage -jinja2==3.1.2 \ - --hash=sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852 \ - --hash=sha256:6088930bfe239f0e6710546ab9c19c9ef35e29792895fed6e6e31a023a182a61 +jinja2==3.1.3 \ + --hash=sha256:7d6d50dd97d52cbc355597bd845fabfbac3f551e1f99619e39a35ce8c370b5fa \ + --hash=sha256:ac8bd6544d4bb2c9792bf3a159e80bba8fda7f07e81bc3aed565432d5925ba90 # via gcp-releasetool keyring==24.2.0 \ --hash=sha256:4901caaf597bfd3bbd78c9a0c7c4c29fcd8310dab2cffefe749e916b6527acd6 \ diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c3b7ca7..52bbfe0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,18 @@ Older versions of this project were distributed as [pybigquery][0]. [2]: https://pypi.org/project/pybigquery/#history +## [1.10.0](https://github.com/googleapis/python-bigquery-sqlalchemy/compare/v1.9.0...v1.10.0) (2024-02-27) + + +### Features + +* Allow to set clustering and time partitioning options at table creation ([#928](https://github.com/googleapis/python-bigquery-sqlalchemy/issues/928)) ([c2c2958](https://github.com/googleapis/python-bigquery-sqlalchemy/commit/c2c2958886cc3c8a51c4f5fc1a8c36b65921edd9)) + + +### Bug Fixes + +* Avoid implicit join when using join with unnest ([#924](https://github.com/googleapis/python-bigquery-sqlalchemy/issues/924)) ([ac74a34](https://github.com/googleapis/python-bigquery-sqlalchemy/commit/ac74a3434c437f60b6f215ac09dea224aa406f8a)) + ## [1.9.0](https://github.com/googleapis/python-bigquery-sqlalchemy/compare/v1.8.0...v1.9.0) (2023-12-10) diff --git a/README.rst b/README.rst index a2036289..17534886 100644 --- a/README.rst +++ b/README.rst @@ -292,7 +292,12 @@ To add metadata to a table: .. code-block:: python - table = Table('mytable', ..., bigquery_description='my table description', bigquery_friendly_name='my table friendly name') + table = Table('mytable', ..., + bigquery_description='my table description', + bigquery_friendly_name='my table friendly name', + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_expiration_timestamp=datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) To add metadata to a column: @@ -300,6 +305,52 @@ To add metadata to a column: Column('mycolumn', doc='my column description') +To create a clustered table: + +.. code-block:: python + + table = Table('mytable', ..., bigquery_clustering_fields=["a", "b", "c"]) + +To create a time-unit column-partitioned table: + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning( + field="mytimestamp", + type_="MONTH", + expiration_ms=1000 * 60 * 60 * 24 * 30 * 6, # 6 months + ), + bigquery_require_partition_filter=True, + ) + +To create an ingestion-time partitioned table: + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_time_partitioning=bigquery.TimePartitioning(), + bigquery_require_partition_filter=True, + ) + +To create an integer-range partitioned table + +.. code-block:: python + + from google.cloud import bigquery + + table = Table('mytable', ..., + bigquery_range_partitioning=bigquery.RangePartitioning( + field="zipcode", + range_=bigquery.PartitionRange(start=0, end=100000, interval=10), + ), + bigquery_require_partition_filter=True, + ) + Threading and Multiprocessing ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/dev_requirements.txt b/dev_requirements.txt index ddc53054..dac67503 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -2,4 +2,4 @@ sqlalchemy>=2.0.15,<2.1.0 google-cloud-bigquery>=1.6.0 pytest===6.2.5 pytest-flake8===1.1.0 # versions 1.1.1 and above require pytest 7 -pytz==2023.3 +pytz==2024.1 diff --git a/noxfile.py b/noxfile.py index e31f32c5..28f000db 100644 --- a/noxfile.py +++ b/noxfile.py @@ -394,10 +394,11 @@ def compliance(session): f"--junitxml=compliance_{session.python}_sponge_log.xml", "--reruns=3", "--reruns-delay=60", - "--only-rerun=403 Exceeded rate limits", - "--only-rerun=409 Already Exists", - "--only-rerun=404 Not found", - "--only-rerun=400 Cannot execute DML over a non-existent table", + "--only-rerun=Exceeded rate limits", + "--only-rerun=Already Exists", + "--only-rerun=Not found", + "--only-rerun=Cannot execute DML over a non-existent table", + "--only-rerun=Job exceeded rate limits", system_test_folder_path, *session.posargs, # To suppress the "Deprecated API features detected!" warning when @@ -427,7 +428,16 @@ def docs(session): session.install("-e", ".") session.install( - "sphinx==4.0.1", + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", + "sphinx==4.5.0", "alabaster", "geoalchemy2", "shapely", @@ -455,6 +465,15 @@ def docfx(session): session.install("-e", ".") session.install( + # We need to pin to specific versions of the `sphinxcontrib-*` packages + # which still support sphinx 4.x. + # See https://github.com/googleapis/sphinx-docfx-yaml/issues/344 + # and https://github.com/googleapis/sphinx-docfx-yaml/issues/345. + "sphinxcontrib-applehelp==1.0.4", + "sphinxcontrib-devhelp==1.0.2", + "sphinxcontrib-htmlhelp==2.0.1", + "sphinxcontrib-qthelp==1.0.3", + "sphinxcontrib-serializinghtml==1.1.5", "gcp-sphinx-docfx-yaml", "alabaster", "geoalchemy2", diff --git a/owlbot.py b/owlbot.py index 22678c8b..8c3ce732 100644 --- a/owlbot.py +++ b/owlbot.py @@ -188,10 +188,11 @@ def compliance(session): f"--junitxml=compliance_{session.python}_sponge_log.xml", "--reruns=3", "--reruns-delay=60", - "--only-rerun=403 Exceeded rate limits", - "--only-rerun=409 Already Exists", - "--only-rerun=404 Not found", - "--only-rerun=400 Cannot execute DML over a non-existent table", + "--only-rerun=Exceeded rate limits", + "--only-rerun=Already Exists", + "--only-rerun=Not found", + "--only-rerun=Cannot execute DML over a non-existent table", + "--only-rerun=Job exceeded rate limits", system_test_folder_path, *session.posargs, # To suppress the "Deprecated API features detected!" warning when diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt index 90537d96..b39744f7 100644 --- a/samples/snippets/requirements-test.txt +++ b/samples/snippets/requirements-test.txt @@ -1,16 +1,16 @@ -attrs==23.1.0 -click==8.1.6 -google-auth==2.22.0 -google-cloud-testutils==1.3.3 +attrs==23.2.0 +click==8.1.7 +google-auth==2.28.1 +google-cloud-testutils==1.4.0 iniconfig==2.0.0 -packaging==23.1 -pluggy==1.2.0 +packaging==23.2 +pluggy==1.4.0 py==1.11.0 -pyasn1==0.5.0 +pyasn1==0.5.1 pyasn1-modules==0.3.0 pyparsing==3.1.1 pytest===6.2.5 rsa==4.9 six==1.16.0 toml==0.10.2 -typing-extensions==4.7.1 +typing-extensions==4.10.0 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index b15bf2cb..a1e62c32 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,33 +1,33 @@ -alembic==1.11.2 -certifi==2023.7.22 -charset-normalizer==3.2.0 -geoalchemy2==0.14.1 -google-api-core[grpc]==2.11.1 -google-auth==2.22.0 -google-cloud-bigquery==3.11.4 -google-cloud-core==2.3.3 +alembic==1.13.1 +certifi==2024.2.2 +charset-normalizer==3.3.2 +geoalchemy2==0.14.4 +google-api-core[grpc]==2.17.1 +google-auth==2.28.1 +google-cloud-bigquery==3.17.2 +google-cloud-core==2.4.1 google-crc32c==1.5.0 -google-resumable-media==2.5.0 -googleapis-common-protos==1.60.0 -greenlet==3.0.1 -grpcio==1.59.0 -grpcio-status==1.57.0 -idna==3.4 -importlib-resources==6.0.1; python_version >= '3.8' -mako==1.2.4 -markupsafe==2.1.3 -packaging==23.1 -proto-plus==1.22.3 -protobuf==4.24.0 -pyasn1==0.5.0 +google-resumable-media==2.7.0 +googleapis-common-protos==1.62.0 +greenlet==3.0.3 +grpcio==1.62.0 +grpcio-status==1.62.0 +idna==3.6 +importlib-resources==6.1.2; python_version >= '3.8' +mako==1.3.2 +markupsafe==2.1.5 +packaging==23.2 +proto-plus==1.23.0 +protobuf==4.25.3 +pyasn1==0.5.1 pyasn1-modules==0.3.0 pyparsing==3.1.1 python-dateutil==2.8.2 -pytz==2023.3 +pytz==2024.1 requests==2.31.0 rsa==4.9 -shapely==2.0.2 +shapely==2.0.3 six==1.16.0 sqlalchemy===1.4.27 -typing-extensions==4.7.1 -urllib3==1.26.18 +typing-extensions==4.10.0 +urllib3==2.2.1 diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index 5297f223..f4266f13 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -19,6 +19,7 @@ """Integration between SQLAlchemy and BigQuery.""" +import datetime from decimal import Decimal import random import operator @@ -27,7 +28,11 @@ from google import auth import google.api_core.exceptions from google.cloud.bigquery import dbapi -from google.cloud.bigquery.table import TableReference +from google.cloud.bigquery.table import ( + RangePartitioning, + TableReference, + TimePartitioning, +) from google.api_core.exceptions import NotFound import packaging.version import sqlalchemy @@ -35,7 +40,7 @@ import sqlalchemy.sql.functions import sqlalchemy.sql.sqltypes import sqlalchemy.sql.type_api -from sqlalchemy.exc import NoSuchTableError +from sqlalchemy.exc import NoSuchTableError, NoSuchColumnError from sqlalchemy import util from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.compiler import ( @@ -269,6 +274,14 @@ def _known_tables(self): if table is not None: known_tables.add(table.name) + # If we have the table in the `from` of our parent, do not add the alias + # as this will add the table twice and cause an implicit JOIN for that + # table on itself + asfrom_froms = self.stack[-1].get("asfrom_froms", []) + for from_ in asfrom_froms: + if isinstance(from_, Table): + known_tables.add(from_.name) + return known_tables def visit_column( @@ -623,6 +636,13 @@ def visit_NUMERIC(self, type_, **kw): class BigQueryDDLCompiler(DDLCompiler): + option_datatype_mapping = { + "friendly_name": str, + "expiration_timestamp": datetime.datetime, + "require_partition_filter": bool, + "default_rounding_mode": str, + } + # BigQuery has no support for foreign keys. def visit_foreign_key_constraint(self, constraint): return None @@ -646,26 +666,99 @@ def get_column_specification(self, column, **kwargs): return colspec def post_create_table(self, table): + """ + Constructs additional SQL clauses for table creation in BigQuery. + + This function processes the BigQuery dialect-specific options and generates SQL clauses for partitioning, + clustering, and other table options. + + Args: + table (Table): The SQLAlchemy Table object for which the SQL is being generated. + + Returns: + str: A string composed of SQL clauses for time partitioning, clustering, and other BigQuery specific + options, each separated by a newline. Returns an empty string if no such options are specified. + + Raises: + TypeError: If the time_partitioning option is not a `TimePartitioning` object or if the clustering_fields option is not a list. + NoSuchColumnError: If any field specified in clustering_fields does not exist in the table. + """ + bq_opts = table.dialect_options["bigquery"] - opts = [] - if ("description" in bq_opts) or table.comment: - description = process_string_literal( - bq_opts.get("description", table.comment) + options = {} + clauses = [] + + if ( + bq_opts.get("time_partitioning") is not None + and bq_opts.get("range_partitioning") is not None + ): + raise ValueError( + "biquery_time_partitioning and bigquery_range_partitioning" + " dialect options are mutually exclusive." + ) + + if (time_partitioning := bq_opts.get("time_partitioning")) is not None: + self._raise_for_type( + "time_partitioning", + time_partitioning, + TimePartitioning, ) - opts.append(f"description={description}") - if "friendly_name" in bq_opts: - opts.append( - "friendly_name={}".format( - process_string_literal(bq_opts["friendly_name"]) + if time_partitioning.expiration_ms: + _24hours = 1000 * 60 * 60 * 24 + options["partition_expiration_days"] = ( + time_partitioning.expiration_ms / _24hours ) + + partition_by_clause = self._process_time_partitioning( + table, + time_partitioning, ) - if opts: - return "\nOPTIONS({})".format(", ".join(opts)) + clauses.append(partition_by_clause) - return "" + if (range_partitioning := bq_opts.get("range_partitioning")) is not None: + self._raise_for_type( + "range_partitioning", + range_partitioning, + RangePartitioning, + ) + + partition_by_clause = self._process_range_partitioning( + table, + range_partitioning, + ) + + clauses.append(partition_by_clause) + + if (clustering_fields := bq_opts.get("clustering_fields")) is not None: + self._raise_for_type("clustering_fields", clustering_fields, list) + + for field in clustering_fields: + if field not in table.c: + raise NoSuchColumnError(field) + + clauses.append(f"CLUSTER BY {', '.join(clustering_fields)}") + + if ("description" in bq_opts) or table.comment: + description = bq_opts.get("description", table.comment) + self._validate_option_value_type("description", description) + options["description"] = description + + for option in self.option_datatype_mapping: + if option in bq_opts: + options[option] = bq_opts.get(option) + + if options: + individual_option_statements = [ + "{}={}".format(k, self._process_option_value(v)) + for (k, v) in options.items() + if self._validate_option_value_type(k, v) + ] + clauses.append(f"OPTIONS({', '.join(individual_option_statements)})") + + return " " + "\n".join(clauses) def visit_set_table_comment(self, create): table_name = self.preparer.format_table(create.element) @@ -678,6 +771,152 @@ def visit_drop_table_comment(self, drop): table_name = self.preparer.format_table(drop.element) return f"ALTER TABLE {table_name} SET OPTIONS(description=null)" + def _validate_option_value_type(self, option: str, value): + """ + Validates the type of the given option value against the expected data type. + + Args: + option (str): The name of the option to be validated. + value: The value of the dialect option whose type is to be checked. The type of this parameter + is dynamic and is verified against the expected type in `self.option_datatype_mapping`. + + Returns: + bool: True if the type of the value matches the expected type, or if the option is not found in + `self.option_datatype_mapping`. + + Raises: + TypeError: If the type of the provided value does not match the expected type as defined in + `self.option_datatype_mapping`. + """ + if option in self.option_datatype_mapping: + self._raise_for_type( + option, + value, + self.option_datatype_mapping[option], + ) + + return True + + def _raise_for_type(self, option, value, expected_type): + if type(value) is not expected_type: + raise TypeError( + f"bigquery_{option} dialect option accepts only {expected_type}," + f" provided {repr(value)}" + ) + + def _process_time_partitioning( + self, table: Table, time_partitioning: TimePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. + + Args: + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - time_partitioning (TimePartitioning): The time partitioning details, + including the field to be used for partitioning. + + Returns: + - str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to + partition data on the specified field. + + Example: + - Given a table with a TIMESTAMP type column 'event_timestamp' and setting + 'time_partitioning.field' to 'event_timestamp', the function returns + "PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)". + """ + field = "_PARTITIONDATE" + trunc_fn = "DATE_TRUNC" + + if time_partitioning.field is not None: + field = time_partitioning.field + if isinstance( + table.columns[time_partitioning.field].type, + sqlalchemy.sql.sqltypes.TIMESTAMP, + ): + trunc_fn = "TIMESTAMP_TRUNC" + + return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})" + + def _process_range_partitioning( + self, table: Table, range_partitioning: RangePartitioning + ): + """ + Generates a SQL 'PARTITION BY' clause for partitioning a table by a range of integers. + + Args: + - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - range_partitioning (RangePartitioning): The RangePartitioning object containing the + partitioning field, range start, range end, and interval. + + Returns: + - str: A SQL string for range partitioning using RANGE_BUCKET and GENERATE_ARRAY functions. + + Raises: + - AttributeError: If the partitioning field is not defined. + - ValueError: If the partitioning field (i.e. column) data type is not an integer. + - TypeError: If the partitioning range start/end values are not integers. + + Example: + "PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 10))" + """ + if range_partitioning.field is None: + raise AttributeError( + "bigquery_range_partitioning expects field to be defined" + ) + + if not isinstance( + table.columns[range_partitioning.field].type, + sqlalchemy.sql.sqltypes.INT, + ): + raise ValueError( + "bigquery_range_partitioning expects field (i.e. column) data type to be INTEGER" + ) + + range_ = range_partitioning.range_ + + if not isinstance(range_.start, int): + raise TypeError( + "bigquery_range_partitioning expects range_.start to be an int," + f" provided {repr(range_.start)}" + ) + + if not isinstance(range_.end, int): + raise TypeError( + "bigquery_range_partitioning expects range_.end to be an int," + f" provided {repr(range_.end)}" + ) + + default_interval = 1 + + return f"PARTITION BY RANGE_BUCKET({range_partitioning.field}, GENERATE_ARRAY({range_.start}, {range_.end}, {range_.interval or default_interval}))" + + def _process_option_value(self, value): + """ + Transforms the given option value into a literal representation suitable for SQL queries in BigQuery. + + Args: + value: The value to be transformed. + + Returns: + The processed value in a format suitable for inclusion in a SQL query. + + Raises: + NotImplementedError: When there is no transformation registered for a data type. + """ + option_casting = { + # Mapping from option type to its casting method + str: lambda x: process_string_literal(x), + int: lambda x: x, + float: lambda x: x, + bool: lambda x: "true" if x else "false", + datetime.datetime: lambda x: BQTimestamp.process_timestamp_literal(x), + } + + if (option_cast := option_casting.get(type(value))) is not None: + return option_cast(value) + + raise NotImplementedError(f"No transformation registered for {repr(value)}") + def process_string_literal(value): return repr(value.replace("%", "%%")) @@ -989,25 +1228,8 @@ def get_pk_constraint(self, connection, table_name, schema=None, **kw): return {"constrained_columns": []} def get_indexes(self, connection, table_name, schema=None, **kw): - table = self._get_table(connection, table_name, schema) - indexes = [] - if table.time_partitioning: - indexes.append( - { - "name": "partition", - "column_names": [table.time_partitioning.field], - "unique": False, - } - ) - if table.clustering_fields: - indexes.append( - { - "name": "clustering", - "column_names": table.clustering_fields, - "unique": False, - } - ) - return indexes + # BigQuery has no support for indexes. + return [] def get_schema_names(self, connection, **kw): if isinstance(connection, Engine): diff --git a/sqlalchemy_bigquery/version.py b/sqlalchemy_bigquery/version.py index f15b4f67..04f2b0f7 100644 --- a/sqlalchemy_bigquery/version.py +++ b/sqlalchemy_bigquery/version.py @@ -17,4 +17,4 @@ # IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -__version__ = "1.9.0" +__version__ = "1.10.0" diff --git a/tests/system/test_alembic.py b/tests/system/test_alembic.py index 1948a19a..30308c68 100644 --- a/tests/system/test_alembic.py +++ b/tests/system/test_alembic.py @@ -23,7 +23,7 @@ from sqlalchemy import Column, DateTime, Integer, String, Numeric import google.api_core.exceptions -from google.cloud.bigquery import SchemaField +from google.cloud.bigquery import SchemaField, TimePartitioning alembic = pytest.importorskip("alembic") @@ -138,15 +138,12 @@ def test_alembic_scenario(alembic_table): op.drop_table("accounts") assert alembic_table("accounts") is None - op.execute( - """ - create table transactions( - account INT64 NOT NULL, - transaction_time DATETIME NOT NULL, - amount NUMERIC(11, 2) NOT NULL - ) - partition by DATE(transaction_time) - """ + op.create_table( + "transactions", + Column("account", Integer, nullable=False), + Column("transaction_time", DateTime(), nullable=False), + Column("amount", Numeric(11, 2), nullable=False), + bigquery_time_partitioning=TimePartitioning(field="transaction_time"), ) op.alter_column("transactions", "amount", nullable=True) diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 62b534ff..cccbd4bb 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -22,6 +22,8 @@ import datetime import decimal +from google.cloud.bigquery import TimePartitioning + from sqlalchemy.engine import create_engine from sqlalchemy.schema import Table, MetaData, Column from sqlalchemy.ext.declarative import declarative_base @@ -539,6 +541,14 @@ def test_create_table(engine, bigquery_dataset): Column("binary_c", sqlalchemy.BINARY), bigquery_description="test table description", bigquery_friendly_name="test table name", + bigquery_expiration_timestamp=datetime.datetime(2183, 3, 26, 8, 30, 0), + bigquery_time_partitioning=TimePartitioning( + field="timestamp_c", + expiration_ms=1000 * 60 * 60 * 24 * 30, # 30 days + ), + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["integer_c", "decimal_c"], ) meta.create_all(engine) meta.drop_all(engine) @@ -594,17 +604,7 @@ def test_view_names(inspector, inspector_using_test_dataset, bigquery_dataset): def test_get_indexes(inspector, inspector_using_test_dataset, bigquery_dataset): for _ in [f"{bigquery_dataset}.sample", f"{bigquery_dataset}.sample_one_row"]: indexes = inspector.get_indexes(f"{bigquery_dataset}.sample") - assert len(indexes) == 2 - assert indexes[0] == { - "name": "partition", - "column_names": ["timestamp"], - "unique": False, - } - assert indexes[1] == { - "name": "clustering", - "column_names": ["integer", "string"], - "unique": False, - } + assert len(indexes) == 0 def test_get_columns(inspector, inspector_using_test_dataset, bigquery_dataset): diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index f808b380..6f197196 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -25,6 +25,8 @@ import pytest import sqlalchemy +from sqlalchemy_bigquery.base import BigQueryDDLCompiler, BigQueryDialect + from . import fauxdbi sqlalchemy_version = packaging.version.parse(sqlalchemy.__version__) @@ -91,6 +93,11 @@ def metadata(): return sqlalchemy.MetaData() +@pytest.fixture() +def ddl_compiler(): + return BigQueryDDLCompiler(BigQueryDialect(), None) + + def setup_table(connection, name, *columns, initial_data=(), **kw): metadata = sqlalchemy.MetaData() table = sqlalchemy.Table(name, metadata, *columns, **kw) diff --git a/tests/unit/test_catalog_functions.py b/tests/unit/test_catalog_functions.py index 78614c9f..7eab7b7b 100644 --- a/tests/unit/test_catalog_functions.py +++ b/tests/unit/test_catalog_functions.py @@ -126,18 +126,7 @@ def test_get_indexes(faux_conn): client.tables.foo.time_partitioning = TimePartitioning(field="tm") client.tables.foo.clustering_fields = ["user_email", "store_code"] - assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [ - dict( - name="partition", - column_names=["tm"], - unique=False, - ), - dict( - name="clustering", - column_names=["user_email", "store_code"], - unique=False, - ), - ] + assert faux_conn.dialect.get_indexes(faux_conn, "foo") == [] def test_no_table_pk_constraint(faux_conn): diff --git a/tests/unit/test_compiler.py b/tests/unit/test_compiler.py index db02e593..139b6cbc 100644 --- a/tests/unit/test_compiler.py +++ b/tests/unit/test_compiler.py @@ -21,7 +21,7 @@ import sqlalchemy.exc from .conftest import setup_table -from .conftest import sqlalchemy_1_4_or_higher +from .conftest import sqlalchemy_1_4_or_higher, sqlalchemy_before_1_4 def test_constraints_are_ignored(faux_conn, metadata): @@ -114,3 +114,167 @@ def test_no_alias_for_known_tables_cte(faux_conn, metadata): ) found_cte_sql = q.compile(faux_conn).string assert found_cte_sql == expected_cte_sql + + +def prepare_implicit_join_base_query( + faux_conn, metadata, select_from_table2, old_syntax +): + table1 = setup_table( + faux_conn, "table1", metadata, sqlalchemy.Column("foo", sqlalchemy.Integer) + ) + table2 = setup_table( + faux_conn, + "table2", + metadata, + sqlalchemy.Column("foos", sqlalchemy.ARRAY(sqlalchemy.Integer)), + sqlalchemy.Column("bar", sqlalchemy.Integer), + ) + F = sqlalchemy.func + + unnested_col_name = "unnested_foos" + unnested_foos = F.unnest(table2.c.foos).alias(unnested_col_name) + unnested_foo_col = sqlalchemy.Column(unnested_col_name) + + # Set up initial query + cols = [table1.c.foo, table2.c.bar] if select_from_table2 else [table1.c.foo] + q = sqlalchemy.select(cols) if old_syntax else sqlalchemy.select(*cols) + q = q.select_from(unnested_foos.join(table1, table1.c.foo == unnested_foo_col)) + return q + + +@sqlalchemy_before_1_4 +def test_no_implicit_join_asterix_for_inner_unnest_before_1_4(faux_conn, metadata): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, True, True) + expected_initial_sql = ( + "SELECT `table1`.`foo`, `table2`.`bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = sqlalchemy.select(["*"]).select_from(q) + + expected_outer_sql = ( + "SELECT * \n" + "FROM (SELECT `table1`.`foo` AS `foo`, `table2`.`bar` AS `bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`)" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql + + +@sqlalchemy_1_4_or_higher +def test_no_implicit_join_asterix_for_inner_unnest(faux_conn, metadata): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, True, False) + expected_initial_sql = ( + "SELECT `table1`.`foo`, `table2`.`bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = q.subquery() + q = sqlalchemy.select("*").select_from(q) + + expected_outer_sql = ( + "SELECT * \n" + "FROM (SELECT `table1`.`foo` AS `foo`, `table2`.`bar` AS `bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`) AS `anon_1`" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql + + +@sqlalchemy_before_1_4 +def test_no_implicit_join_for_inner_unnest_before_1_4(faux_conn, metadata): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, True, True) + expected_initial_sql = ( + "SELECT `table1`.`foo`, `table2`.`bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = sqlalchemy.select([q.c.foo]).select_from(q) + + expected_outer_sql = ( + "SELECT `foo` \n" + "FROM (SELECT `table1`.`foo` AS `foo`, `table2`.`bar` AS `bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`)" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql + + +@sqlalchemy_1_4_or_higher +def test_no_implicit_join_for_inner_unnest(faux_conn, metadata): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, True, False) + expected_initial_sql = ( + "SELECT `table1`.`foo`, `table2`.`bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = q.subquery() + q = sqlalchemy.select(q.c.foo).select_from(q) + + expected_outer_sql = ( + "SELECT `anon_1`.`foo` \n" + "FROM (SELECT `table1`.`foo` AS `foo`, `table2`.`bar` AS `bar` \n" + "FROM `table2`, unnest(`table2`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`) AS `anon_1`" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql + + +@sqlalchemy_1_4_or_higher +def test_no_implicit_join_asterix_for_inner_unnest_no_table2_column( + faux_conn, metadata +): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, False, False) + expected_initial_sql = ( + "SELECT `table1`.`foo` \n" + "FROM `table2` `table2_1`, unnest(`table2_1`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = q.subquery() + q = sqlalchemy.select("*").select_from(q) + + expected_outer_sql = ( + "SELECT * \n" + "FROM (SELECT `table1`.`foo` AS `foo` \n" + "FROM `table2` `table2_1`, unnest(`table2_1`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`) AS `anon_1`" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql + + +@sqlalchemy_1_4_or_higher +def test_no_implicit_join_for_inner_unnest_no_table2_column(faux_conn, metadata): + # See: https://github.com/googleapis/python-bigquery-sqlalchemy/issues/368 + q = prepare_implicit_join_base_query(faux_conn, metadata, False, False) + expected_initial_sql = ( + "SELECT `table1`.`foo` \n" + "FROM `table2` `table2_1`, unnest(`table2_1`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`" + ) + found_initial_sql = q.compile(faux_conn).string + assert found_initial_sql == expected_initial_sql + + q = q.subquery() + q = sqlalchemy.select(q.c.foo).select_from(q) + + expected_outer_sql = ( + "SELECT `anon_1`.`foo` \n" + "FROM (SELECT `table1`.`foo` AS `foo` \n" + "FROM `table2` `table2_1`, unnest(`table2_1`.`foos`) AS `unnested_foos` JOIN `table1` ON `table1`.`foo` = `unnested_foos`) AS `anon_1`" + ) + found_outer_sql = q.compile(faux_conn).string + assert found_outer_sql == expected_outer_sql diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py new file mode 100644 index 00000000..2147fb1d --- /dev/null +++ b/tests/unit/test_table_options.py @@ -0,0 +1,474 @@ +# Copyright (c) 2021 The sqlalchemy-bigquery Authors +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +import datetime +import sqlite3 +import pytest +import sqlalchemy + +from google.cloud.bigquery import ( + PartitionRange, + RangePartitioning, + TimePartitioning, + TimePartitioningType, +) + +from .conftest import setup_table + + +def test_table_expiration_timestamp_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00')" + ) + + +def test_table_default_rounding_mode_dialect_option(faux_conn): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " OPTIONS(default_rounding_mode='ROUND_HALF_EVEN')" + ) + + +def test_table_clustering_fields_dialect_option_no_such_column(faux_conn): + with pytest.raises(sqlalchemy.exc.NoSuchColumnError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_clustering_fields=["country", "unknown"], + ) + + +def test_table_clustering_fields_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields=["country", "town"], + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING )" + " CLUSTER BY country, town" + ) + + +def test_table_clustering_fields_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_clustering_fields is not a list + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + bigquery_clustering_fields="country, town", + ) + + +def test_table_time_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)" + ) + + +def test_table_require_partition_filter_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + bigquery_require_partition_filter=True, + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(require_partition_filter=true)" + ) + + +def test_table_time_partitioning_with_field_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_by_month_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_=TimePartitioningType.MONTH, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, MONTH)" + ) + + +def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP), + bigquery_time_partitioning=TimePartitioning(field="createdAt"), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )" + " PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)" + ) + + +def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=21600000, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " OPTIONS(partition_expiration_days=0.25)" + ) + + +def test_table_partitioning_dialect_option_type_error(faux_conn): + # expect TypeError when bigquery_time_partitioning is not a TimePartitioning object + with pytest.raises(TypeError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_time_partitioning="DATE(createdAt)", + ) + + +def test_table_range_partitioning_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=2, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 2))" + ) + + +def test_table_range_partitioning_dialect_option_no_field(faux_conn): + # expect TypeError when bigquery_range_partitioning field is not defined + with pytest.raises( + AttributeError, + match="bigquery_range_partitioning expects field to be defined", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_bad_column_type(faux_conn): + # expect ValueError when bigquery_range_partitioning field is not an INTEGER + with pytest.raises( + ValueError, + match=r"bigquery_range_partitioning expects field \(i\.e\. column\) data type to be INTEGER", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.FLOAT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + interval=10, + ), + ), + ) + + +def test_table_range_partitioning_dialect_option_range_missing(faux_conn): + # expect TypeError when bigquery_range_partitioning range start or end is missing + with pytest.raises( + TypeError, + match="bigquery_range_partitioning expects range_.start to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning(field="zipcode"), + ) + + with pytest.raises( + TypeError, + match="bigquery_range_partitioning expects range_.end to be an int, provided None", + ): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange(start=1), + ), + ) + + +def test_table_range_partitioning_dialect_option_default_interval(faux_conn): + # expect table creation to fail as SQLite does not support partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("zipcode", sqlalchemy.INT), + bigquery_range_partitioning=RangePartitioning( + field="zipcode", + range_=PartitionRange( + start=0, + end=100000, + ), + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `zipcode` INT64 )" + " PARTITION BY RANGE_BUCKET(zipcode, GENERATE_ARRAY(0, 100000, 1))" + ) + + +def test_time_and_range_partitioning_mutually_exclusive(faux_conn): + # expect ValueError when both bigquery_time_partitioning and bigquery_range_partitioning are provided + with pytest.raises(ValueError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_range_partitioning=RangePartitioning(), + bigquery_time_partitioning=TimePartitioning(), + ) + + +def test_table_all_dialect_option(faux_conn): + # expect table creation to fail as SQLite does not support clustering and partitioned tables + with pytest.raises(sqlite3.OperationalError): + setup_table( + faux_conn, + "some_table", + sqlalchemy.Column("id", sqlalchemy.Integer), + sqlalchemy.Column("country", sqlalchemy.Text), + sqlalchemy.Column("town", sqlalchemy.Text), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), + bigquery_expiration_timestamp=datetime.datetime.fromisoformat( + "2038-01-01T00:00:00+00:00" + ), + bigquery_require_partition_filter=True, + bigquery_default_rounding_mode="ROUND_HALF_EVEN", + bigquery_clustering_fields=["country", "town"], + bigquery_time_partitioning=TimePartitioning( + field="createdAt", + type_="DAY", + expiration_ms=2592000000, + ), + ) + + assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )" + " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " CLUSTER BY country, town" + " OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" + ) + + +def test_validate_friendly_name_value_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("friendly_name", "Friendly name") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("friendly_name", 1983) + + +def test_validate_expiration_timestamp_value_type(ddl_compiler): + # expect option value to be transformed as a timestamp expression + + assert ddl_compiler._validate_option_value_type( + "expiration_timestamp", + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00"), + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("expiration_timestamp", "2038-01-01") + + +def test_validate_require_partition_filter_type(ddl_compiler): + # expect option value to be transformed as a literal boolean + + assert ddl_compiler._validate_option_value_type("require_partition_filter", True) + assert ddl_compiler._validate_option_value_type("require_partition_filter", False) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("require_partition_filter", "true") + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("require_partition_filter", "false") + + +def test_validate_default_rounding_mode_type(ddl_compiler): + # expect option value to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type( + "default_rounding_mode", "ROUND_HALF_EVEN" + ) + + with pytest.raises(TypeError): + ddl_compiler._validate_option_value_type("default_rounding_mode", True) + + +def test_validate_unmapped_option_type(ddl_compiler): + # expect option value with no typed specified in mapping to be transformed as a string expression + + assert ddl_compiler._validate_option_value_type("unknown", "DEFAULT_IS_STRING") + + +def test_process_str_option_value(ddl_compiler): + # expect string to be transformed as a string expression + assert ddl_compiler._process_option_value("Some text") == "'Some text'" + + +def test_process_datetime_value(ddl_compiler): + # expect datetime object to be transformed as a timestamp expression + assert ( + ddl_compiler._process_option_value( + datetime.datetime.fromisoformat("2038-01-01T00:00:00+00:00") + ) + == "TIMESTAMP '2038-01-01 00:00:00+00:00'" + ) + + +def test_process_int_option_value(ddl_compiler): + # expect int to be unchanged + assert ddl_compiler._process_option_value(90) == 90 + + +def test_process_boolean_option_value(ddl_compiler): + # expect boolean to be transformed as a literal boolean expression + + assert ddl_compiler._process_option_value(True) == "true" + assert ddl_compiler._process_option_value(False) == "false" + + +def test_process_not_implementer_option_value(ddl_compiler): + # expect to raise + with pytest.raises(NotImplementedError): + ddl_compiler._process_option_value(float)