diff --git a/bigquery_storage/__init__.py b/bigquery_storage/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bigquery_storage/conftest.py b/bigquery_storage/conftest.py new file mode 100644 index 00000000000..63d53531471 --- /dev/null +++ b/bigquery_storage/conftest.py @@ -0,0 +1,46 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +import random +from typing import Generator + +from google.cloud import bigquery + +import pytest + + +@pytest.fixture(scope="session") +def project_id() -> str: + return os.environ["GOOGLE_CLOUD_PROJECT"] + + +@pytest.fixture(scope="session") +def dataset(project_id: str) -> Generator[bigquery.Dataset, None, None]: + client = bigquery.Client() + + # Add a random suffix to dataset name to avoid conflict, because we run + # a samples test on each supported Python version almost at the same time. + dataset_time = datetime.datetime.now().strftime("%y%m%d_%H%M%S") + suffix = f"_{(random.randint(0, 99)):02d}" + dataset_name = "samples_tests_" + dataset_time + suffix + + dataset_id = "{}.{}".format(project_id, dataset_name) + dataset = bigquery.Dataset(dataset_id) + dataset.location = "us-east7" + created_dataset = client.create_dataset(dataset) + yield created_dataset + + client.delete_dataset(created_dataset, delete_contents=True) diff --git a/bigquery_storage/pyarrow/__init__.py b/bigquery_storage/pyarrow/__init__.py new file mode 100644 index 00000000000..a2a70562f48 --- /dev/null +++ b/bigquery_storage/pyarrow/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/bigquery_storage/pyarrow/append_rows_with_arrow.py b/bigquery_storage/pyarrow/append_rows_with_arrow.py new file mode 100644 index 00000000000..78cb0a57573 --- /dev/null +++ b/bigquery_storage/pyarrow/append_rows_with_arrow.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from concurrent.futures import Future +import datetime +import decimal +from typing import Iterable + +from google.cloud import bigquery +from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery import enums +from google.cloud.bigquery_storage_v1 import types as gapic_types +from google.cloud.bigquery_storage_v1.writer import AppendRowsStream +import pandas as pd +import pyarrow as pa + + +TABLE_LENGTH = 100_000 + +BQ_SCHEMA = [ + bigquery.SchemaField("bool_col", enums.SqlTypeNames.BOOLEAN), + bigquery.SchemaField("int64_col", enums.SqlTypeNames.INT64), + bigquery.SchemaField("float64_col", enums.SqlTypeNames.FLOAT64), + bigquery.SchemaField("numeric_col", enums.SqlTypeNames.NUMERIC), + bigquery.SchemaField("bignumeric_col", enums.SqlTypeNames.BIGNUMERIC), + bigquery.SchemaField("string_col", enums.SqlTypeNames.STRING), + bigquery.SchemaField("bytes_col", enums.SqlTypeNames.BYTES), + bigquery.SchemaField("date_col", enums.SqlTypeNames.DATE), + bigquery.SchemaField("datetime_col", enums.SqlTypeNames.DATETIME), + bigquery.SchemaField("time_col", enums.SqlTypeNames.TIME), + bigquery.SchemaField("timestamp_col", enums.SqlTypeNames.TIMESTAMP), + bigquery.SchemaField("geography_col", enums.SqlTypeNames.GEOGRAPHY), + bigquery.SchemaField( + "range_date_col", enums.SqlTypeNames.RANGE, range_element_type="DATE" + ), + bigquery.SchemaField( + "range_datetime_col", + enums.SqlTypeNames.RANGE, + range_element_type="DATETIME", + ), + bigquery.SchemaField( + "range_timestamp_col", + enums.SqlTypeNames.RANGE, + range_element_type="TIMESTAMP", + ), +] + +PYARROW_SCHEMA = pa.schema( + [ + pa.field("bool_col", pa.bool_()), + pa.field("int64_col", pa.int64()), + pa.field("float64_col", pa.float64()), + pa.field("numeric_col", pa.decimal128(38, scale=9)), + pa.field("bignumeric_col", pa.decimal256(76, scale=38)), + pa.field("string_col", pa.string()), + pa.field("bytes_col", pa.binary()), + pa.field("date_col", pa.date32()), + pa.field("datetime_col", pa.timestamp("us")), + pa.field("time_col", pa.time64("us")), + pa.field("timestamp_col", pa.timestamp("us")), + pa.field("geography_col", pa.string()), + pa.field( + "range_date_col", + pa.struct([("start", pa.date32()), ("end", pa.date32())]), + ), + pa.field( + "range_datetime_col", + pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]), + ), + pa.field( + "range_timestamp_col", + pa.struct([("start", pa.timestamp("us")), ("end", pa.timestamp("us"))]), + ), + ] +) + + +def bqstorage_write_client() -> bigquery_storage_v1.BigQueryWriteClient: + return bigquery_storage_v1.BigQueryWriteClient() + + +def make_table(project_id: str, dataset_id: str, bq_client: bigquery.Client) -> bigquery.Table: + table_id = "append_rows_w_arrow_test" + table_id_full = f"{project_id}.{dataset_id}.{table_id}" + bq_table = bigquery.Table(table_id_full, schema=BQ_SCHEMA) + created_table = bq_client.create_table(bq_table) + + return created_table + + +def create_stream(bqstorage_write_client: bigquery_storage_v1.BigQueryWriteClient, table: bigquery.Table) -> AppendRowsStream: + stream_name = f"projects/{table.project}/datasets/{table.dataset_id}/tables/{table.table_id}/_default" + request_template = gapic_types.AppendRowsRequest() + request_template.write_stream = stream_name + + # Add schema to the template. + arrow_data = gapic_types.AppendRowsRequest.ArrowData() + arrow_data.writer_schema.serialized_schema = PYARROW_SCHEMA.serialize().to_pybytes() + request_template.arrow_rows = arrow_data + + append_rows_stream = AppendRowsStream( + bqstorage_write_client, + request_template, + ) + return append_rows_stream + + +def generate_pyarrow_table(num_rows: int = TABLE_LENGTH) -> pa.Table: + date_1 = datetime.date(2020, 10, 1) + date_2 = datetime.date(2021, 10, 1) + + datetime_1 = datetime.datetime(2016, 12, 3, 14, 11, 27, 123456) + datetime_2 = datetime.datetime(2017, 12, 3, 14, 11, 27, 123456) + + timestamp_1 = datetime.datetime( + 1999, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc + ) + timestamp_2 = datetime.datetime( + 2000, 12, 31, 23, 59, 59, 999999, tzinfo=datetime.timezone.utc + ) + + # Pandas Dataframe. + rows = [] + for i in range(num_rows): + row = { + "bool_col": True, + "int64_col": i, + "float64_col": float(i), + "numeric_col": decimal.Decimal("0.000000001"), + "bignumeric_col": decimal.Decimal("0.1234567891"), + "string_col": "data as string", + "bytes_col": str.encode("data in bytes"), + "date_col": datetime.date(2019, 5, 10), + "datetime_col": datetime_1, + "time_col": datetime.time(23, 59, 59, 999999), + "timestamp_col": timestamp_1, + "geography_col": "POINT(-121 41)", + "range_date_col": {"start": date_1, "end": date_2}, + "range_datetime_col": {"start": datetime_1, "end": datetime_2}, + "range_timestamp_col": {"start": timestamp_1, "end": timestamp_2}, + } + rows.append(row) + df = pd.DataFrame(rows) + + # Dataframe to PyArrow Table. + table = pa.Table.from_pandas(df, schema=PYARROW_SCHEMA) + + return table + + +def generate_write_requests( + pyarrow_table: pa.Table, +) -> Iterable[gapic_types.AppendRowsRequest]: + # Determine max_chunksize of the record batches. Because max size of + # AppendRowsRequest is 10 MB, we need to split the table if it's too big. + # See: https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#appendrowsrequest + max_request_bytes = 10 * 2**20 # 10 MB + chunk_num = int(pyarrow_table.nbytes / max_request_bytes) + 1 + chunk_size = int(pyarrow_table.num_rows / chunk_num) + + # Construct request(s). + for batch in pyarrow_table.to_batches(max_chunksize=chunk_size): + request = gapic_types.AppendRowsRequest() + request.arrow_rows.rows.serialized_record_batch = batch.serialize().to_pybytes() + yield request + + +def verify_result( + client: bigquery.Client, table: bigquery.Table, futures: "list[Future]" +) -> None: + bq_table = client.get_table(table) + + # Verify table schema. + assert bq_table.schema == BQ_SCHEMA + + # Verify table size. + query = client.query(f"SELECT COUNT(1) FROM `{bq_table}`;") + query_result = query.result().to_dataframe() + + # There might be extra rows due to retries. + assert query_result.iloc[0, 0] >= TABLE_LENGTH + + # Verify that table was split into multiple requests. + assert len(futures) == 2 + + +def main(project_id: str, dataset: bigquery.Dataset) -> None: + # Initialize clients. + write_client = bqstorage_write_client() + bq_client = bigquery.Client() + + # Create BigQuery table. + bq_table = make_table(project_id, dataset.dataset_id, bq_client) + + # Generate local PyArrow table. + pa_table = generate_pyarrow_table() + + # Convert PyArrow table to Protobuf requests. + requests = generate_write_requests(pa_table) + + # Create writing stream to the BigQuery table. + stream = create_stream(write_client, bq_table) + + # Send requests. + futures = [] + for request in requests: + future = stream.send(request) + futures.append(future) + future.result() # Optional, will block until writing is complete. + + # Verify results. + verify_result(bq_client, bq_table, futures) diff --git a/bigquery_storage/pyarrow/append_rows_with_arrow_test.py b/bigquery_storage/pyarrow/append_rows_with_arrow_test.py new file mode 100644 index 00000000000..f31de43b51f --- /dev/null +++ b/bigquery_storage/pyarrow/append_rows_with_arrow_test.py @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud import bigquery + +from . import append_rows_with_arrow + + +def test_append_rows_with_arrow(project_id: str, dataset: bigquery.Dataset) -> None: + append_rows_with_arrow.main(project_id, dataset) diff --git a/bigquery_storage/pyarrow/noxfile_config.py b/bigquery_storage/pyarrow/noxfile_config.py new file mode 100644 index 00000000000..29edb31ffe8 --- /dev/null +++ b/bigquery_storage/pyarrow/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You maye obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/bigquery_storage/pyarrow/requirements-test.txt b/bigquery_storage/pyarrow/requirements-test.txt new file mode 100644 index 00000000000..7561ed55ce2 --- /dev/null +++ b/bigquery_storage/pyarrow/requirements-test.txt @@ -0,0 +1,3 @@ +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest==8.4.1; python_version >= '3.9' diff --git a/bigquery_storage/pyarrow/requirements.txt b/bigquery_storage/pyarrow/requirements.txt new file mode 100644 index 00000000000..a593373b829 --- /dev/null +++ b/bigquery_storage/pyarrow/requirements.txt @@ -0,0 +1,5 @@ +db_dtypes +google-cloud-bigquery +google-cloud-bigquery-storage +pandas +pyarrow diff --git a/bigquery_storage/quickstart/__init__.py b/bigquery_storage/quickstart/__init__.py new file mode 100644 index 00000000000..a2a70562f48 --- /dev/null +++ b/bigquery_storage/quickstart/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/bigquery_storage/quickstart/noxfile_config.py b/bigquery_storage/quickstart/noxfile_config.py new file mode 100644 index 00000000000..f1fa9e5618b --- /dev/null +++ b/bigquery_storage/quickstart/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/bigquery_storage/quickstart/quickstart.py b/bigquery_storage/quickstart/quickstart.py new file mode 100644 index 00000000000..6f120ce9a58 --- /dev/null +++ b/bigquery_storage/quickstart/quickstart.py @@ -0,0 +1,95 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + + +def main(project_id: str = "your-project-id", snapshot_millis: int = 0) -> None: + # [START bigquerystorage_quickstart] + from google.cloud.bigquery_storage import BigQueryReadClient, types + + # TODO(developer): Set the project_id variable. + # project_id = 'your-project-id' + # + # The read session is created in this project. This project can be + # different from that which contains the table. + + client = BigQueryReadClient() + + # This example reads baby name data from the public datasets. + table = "projects/{}/datasets/{}/tables/{}".format( + "bigquery-public-data", "usa_names", "usa_1910_current" + ) + + requested_session = types.ReadSession() + requested_session.table = table + # This API can also deliver data serialized in Apache Arrow format. + # This example leverages Apache Avro. + requested_session.data_format = types.DataFormat.AVRO + + # We limit the output columns to a subset of those allowed in the table, + # and set a simple filter to only report names from the state of + # Washington (WA). + requested_session.read_options.selected_fields = ["name", "number", "state"] + requested_session.read_options.row_restriction = 'state = "WA"' + + # Set a snapshot time if it's been specified. + if snapshot_millis > 0: + snapshot_time = types.Timestamp() + snapshot_time.FromMilliseconds(snapshot_millis) + requested_session.table_modifiers.snapshot_time = snapshot_time + + parent = "projects/{}".format(project_id) + session = client.create_read_session( + parent=parent, + read_session=requested_session, + # We'll use only a single stream for reading data from the table. However, + # if you wanted to fan out multiple readers you could do so by having a + # reader process each individual stream. + max_stream_count=1, + ) + reader = client.read_rows(session.streams[0].name) + + # The read stream contains blocks of Avro-encoded bytes. The rows() method + # uses the fastavro library to parse these blocks as an iterable of Python + # dictionaries. Install fastavro with the following command: + # + # pip install google-cloud-bigquery-storage[fastavro] + rows = reader.rows(session) + + # Do any local processing by iterating over the rows. The + # google-cloud-bigquery-storage client reconnects to the API after any + # transient network errors or timeouts. + names = set() + states = set() + + # fastavro returns EOFError instead of StopIterationError starting v1.8.4. + # See https://github.com/googleapis/python-bigquery-storage/pull/687 + try: + for row in rows: + names.add(row["name"]) + states.add(row["state"]) + except EOFError: + pass + + print("Got {} unique names in states: {}".format(len(names), ", ".join(states))) + # [END bigquerystorage_quickstart] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("project_id") + parser.add_argument("--snapshot_millis", default=0, type=int) + args = parser.parse_args() + main(project_id=args.project_id, snapshot_millis=args.snapshot_millis) diff --git a/bigquery_storage/quickstart/quickstart_test.py b/bigquery_storage/quickstart/quickstart_test.py new file mode 100644 index 00000000000..3380c923847 --- /dev/null +++ b/bigquery_storage/quickstart/quickstart_test.py @@ -0,0 +1,40 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +import pytest + +from . import quickstart + + +def now_millis() -> int: + return int( + (datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)).total_seconds() + * 1000 + ) + + +def test_quickstart_wo_snapshot(capsys: pytest.CaptureFixture, project_id: str) -> None: + quickstart.main(project_id) + out, _ = capsys.readouterr() + assert "unique names in states: WA" in out + + +def test_quickstart_with_snapshot( + capsys: pytest.CaptureFixture, project_id: str +) -> None: + quickstart.main(project_id, now_millis() - 5000) + out, _ = capsys.readouterr() + assert "unique names in states: WA" in out diff --git a/bigquery_storage/quickstart/requirements-test.txt b/bigquery_storage/quickstart/requirements-test.txt new file mode 100644 index 00000000000..7561ed55ce2 --- /dev/null +++ b/bigquery_storage/quickstart/requirements-test.txt @@ -0,0 +1,3 @@ +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest==8.4.1; python_version >= '3.9' diff --git a/bigquery_storage/quickstart/requirements.txt b/bigquery_storage/quickstart/requirements.txt new file mode 100644 index 00000000000..9d69822935d --- /dev/null +++ b/bigquery_storage/quickstart/requirements.txt @@ -0,0 +1,3 @@ +fastavro +google-cloud-bigquery +google-cloud-bigquery-storage==2.32.0 diff --git a/bigquery_storage/snippets/__init__.py b/bigquery_storage/snippets/__init__.py new file mode 100644 index 00000000000..0098709d195 --- /dev/null +++ b/bigquery_storage/snippets/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/bigquery_storage/snippets/append_rows_pending.py b/bigquery_storage/snippets/append_rows_pending.py new file mode 100644 index 00000000000..3c34b472cde --- /dev/null +++ b/bigquery_storage/snippets/append_rows_pending.py @@ -0,0 +1,132 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START bigquerystorage_append_rows_pending] +""" +This code sample demonstrates how to write records in pending mode +using the low-level generated client for Python. +""" + +from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery_storage_v1 import types, writer +from google.protobuf import descriptor_pb2 + +# If you update the customer_record.proto protocol buffer definition, run: +# +# protoc --python_out=. customer_record.proto +# +# from the samples/snippets directory to generate the customer_record_pb2.py module. +from . import customer_record_pb2 + + +def create_row_data(row_num: int, name: str) -> bytes: + row = customer_record_pb2.CustomerRecord() + row.row_num = row_num + row.customer_name = name + return row.SerializeToString() + + +def append_rows_pending(project_id: str, dataset_id: str, table_id: str) -> None: + """Create a write stream, write some sample data, and commit the stream.""" + write_client = bigquery_storage_v1.BigQueryWriteClient() + parent = write_client.table_path(project_id, dataset_id, table_id) + write_stream = types.WriteStream() + + # When creating the stream, choose the type. Use the PENDING type to wait + # until the stream is committed before it is visible. See: + # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type + write_stream.type_ = types.WriteStream.Type.PENDING + write_stream = write_client.create_write_stream( + parent=parent, write_stream=write_stream + ) + stream_name = write_stream.name + + # Create a template with fields needed for the first request. + request_template = types.AppendRowsRequest() + + # The initial request must contain the stream name. + request_template.write_stream = stream_name + + # So that BigQuery knows how to parse the serialized_rows, generate a + # protocol buffer representation of your message descriptor. + proto_schema = types.ProtoSchema() + proto_descriptor = descriptor_pb2.DescriptorProto() + customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor) + proto_schema.proto_descriptor = proto_descriptor + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.writer_schema = proto_schema + request_template.proto_rows = proto_data + + # Some stream types support an unbounded number of requests. Construct an + # AppendRowsStream to send an arbitrary number of requests to a stream. + append_rows_stream = writer.AppendRowsStream(write_client, request_template) + + # Create a batch of row data by appending proto2 serialized bytes to the + # serialized_rows repeated field. + proto_rows = types.ProtoRows() + proto_rows.serialized_rows.append(create_row_data(1, "Alice")) + proto_rows.serialized_rows.append(create_row_data(2, "Bob")) + + # Set an offset to allow resuming this stream if the connection breaks. + # Keep track of which requests the server has acknowledged and resume the + # stream at the first non-acknowledged message. If the server has already + # processed a message with that offset, it will return an ALREADY_EXISTS + # error, which can be safely ignored. + # + # The first request must always have an offset of 0. + request = types.AppendRowsRequest() + request.offset = 0 + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + response_future_1 = append_rows_stream.send(request) + + # Send another batch. + proto_rows = types.ProtoRows() + proto_rows.serialized_rows.append(create_row_data(3, "Charles")) + + # Since this is the second request, you only need to include the row data. + # The name of the stream and protocol buffers DESCRIPTOR is only needed in + # the first request. + request = types.AppendRowsRequest() + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + # Offset must equal the number of rows that were previously sent. + request.offset = 2 + + response_future_2 = append_rows_stream.send(request) + + print(response_future_1.result()) + print(response_future_2.result()) + + # Shutdown background threads and close the streaming connection. + append_rows_stream.close() + + # A PENDING type stream must be "finalized" before being committed. No new + # records can be written to the stream after this method has been called. + write_client.finalize_write_stream(name=write_stream.name) + + # Commit the stream you created earlier. + batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest() + batch_commit_write_streams_request.parent = parent + batch_commit_write_streams_request.write_streams = [write_stream.name] + write_client.batch_commit_write_streams(batch_commit_write_streams_request) + + print(f"Writes to stream: '{write_stream.name}' have been committed.") + + +# [END bigquerystorage_append_rows_pending] diff --git a/bigquery_storage/snippets/append_rows_pending_test.py b/bigquery_storage/snippets/append_rows_pending_test.py new file mode 100644 index 00000000000..791e9609779 --- /dev/null +++ b/bigquery_storage/snippets/append_rows_pending_test.py @@ -0,0 +1,72 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib +import random + +from google.cloud import bigquery +import pytest + +from . import append_rows_pending + +DIR = pathlib.Path(__file__).parent + + +regions = ["US", "non-US"] + + +@pytest.fixture(params=regions) +def sample_data_table( + request: pytest.FixtureRequest, + bigquery_client: bigquery.Client, + project_id: str, + dataset_id: str, + dataset_id_non_us: str, +) -> str: + dataset = dataset_id + if request.param != "US": + dataset = dataset_id_non_us + schema = bigquery_client.schema_from_json(str(DIR / "customer_record_schema.json")) + table_id = f"append_rows_proto2_{random.randrange(10000)}" + full_table_id = f"{project_id}.{dataset}.{table_id}" + table = bigquery.Table(full_table_id, schema=schema) + table = bigquery_client.create_table(table, exists_ok=True) + yield full_table_id + bigquery_client.delete_table(table, not_found_ok=True) + + +def test_append_rows_pending( + capsys: pytest.CaptureFixture, + bigquery_client: bigquery.Client, + sample_data_table: str, +) -> None: + project_id, dataset_id, table_id = sample_data_table.split(".") + append_rows_pending.append_rows_pending( + project_id=project_id, dataset_id=dataset_id, table_id=table_id + ) + out, _ = capsys.readouterr() + assert "have been committed" in out + + rows = bigquery_client.query( + f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`" + ).result() + row_items = [ + # Convert to sorted tuple of items to more easily search for expected rows. + tuple(sorted(row.items())) + for row in rows + ] + + assert (("customer_name", "Alice"), ("row_num", 1)) in row_items + assert (("customer_name", "Bob"), ("row_num", 2)) in row_items + assert (("customer_name", "Charles"), ("row_num", 3)) in row_items diff --git a/bigquery_storage/snippets/append_rows_proto2.py b/bigquery_storage/snippets/append_rows_proto2.py new file mode 100644 index 00000000000..d610b31faa2 --- /dev/null +++ b/bigquery_storage/snippets/append_rows_proto2.py @@ -0,0 +1,256 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START bigquerystorage_append_rows_raw_proto2] +""" +This code sample demonstrates using the low-level generated client for Python. +""" + +import datetime +import decimal + +from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery_storage_v1 import types, writer +from google.protobuf import descriptor_pb2 + +# If you make updates to the sample_data.proto protocol buffers definition, +# run: +# +# protoc --python_out=. sample_data.proto +# +# from the samples/snippets directory to generate the sample_data_pb2 module. +from . import sample_data_pb2 + + +def append_rows_proto2(project_id: str, dataset_id: str, table_id: str) -> None: + """Create a write stream, write some sample data, and commit the stream.""" + write_client = bigquery_storage_v1.BigQueryWriteClient() + parent = write_client.table_path(project_id, dataset_id, table_id) + write_stream = types.WriteStream() + + # When creating the stream, choose the type. Use the PENDING type to wait + # until the stream is committed before it is visible. See: + # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type + write_stream.type_ = types.WriteStream.Type.PENDING + write_stream = write_client.create_write_stream( + parent=parent, write_stream=write_stream + ) + stream_name = write_stream.name + + # Create a template with fields needed for the first request. + request_template = types.AppendRowsRequest() + + # The initial request must contain the stream name. + request_template.write_stream = stream_name + + # So that BigQuery knows how to parse the serialized_rows, generate a + # protocol buffer representation of your message descriptor. + proto_schema = types.ProtoSchema() + proto_descriptor = descriptor_pb2.DescriptorProto() + sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor) + proto_schema.proto_descriptor = proto_descriptor + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.writer_schema = proto_schema + request_template.proto_rows = proto_data + + # Some stream types support an unbounded number of requests. Construct an + # AppendRowsStream to send an arbitrary number of requests to a stream. + append_rows_stream = writer.AppendRowsStream(write_client, request_template) + + # Create a batch of row data by appending proto2 serialized bytes to the + # serialized_rows repeated field. + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 1 + row.bool_col = True + row.bytes_col = b"Hello, World!" + row.float64_col = float("+inf") + row.int64_col = 123 + row.string_col = "Howdy!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 2 + row.bool_col = False + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 3 + row.bytes_col = b"See you later!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 4 + row.float64_col = 1000000.125 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 5 + row.int64_col = 67000 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 6 + row.string_col = "Auf Wiedersehen!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + # Set an offset to allow resuming this stream if the connection breaks. + # Keep track of which requests the server has acknowledged and resume the + # stream at the first non-acknowledged message. If the server has already + # processed a message with that offset, it will return an ALREADY_EXISTS + # error, which can be safely ignored. + # + # The first request must always have an offset of 0. + request = types.AppendRowsRequest() + request.offset = 0 + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + response_future_1 = append_rows_stream.send(request) + + # Create a batch of rows containing scalar values that don't directly + # correspond to a protocol buffers scalar type. See the documentation for + # the expected data formats: + # https://cloud.google.com/bigquery/docs/write-api#data_type_conversions + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 7 + date_value = datetime.date(2021, 8, 12) + epoch_value = datetime.date(1970, 1, 1) + delta = date_value - epoch_value + row.date_col = delta.days + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 8 + datetime_value = datetime.datetime(2021, 8, 12, 9, 46, 23, 987456) + row.datetime_col = datetime_value.strftime("%Y-%m-%d %H:%M:%S.%f") + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 9 + row.geography_col = "POINT(-122.347222 47.651111)" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 10 + numeric_value = decimal.Decimal("1.23456789101112e+6") + row.numeric_col = str(numeric_value) + bignumeric_value = decimal.Decimal("-1.234567891011121314151617181920e+16") + row.bignumeric_col = str(bignumeric_value) + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 11 + time_value = datetime.time(11, 7, 48, 123456) + row.time_col = time_value.strftime("%H:%M:%S.%f") + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 12 + timestamp_value = datetime.datetime( + 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc + ) + epoch_value = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + delta = timestamp_value - epoch_value + row.timestamp_col = int(delta.total_seconds()) * 1000000 + int(delta.microseconds) + proto_rows.serialized_rows.append(row.SerializeToString()) + + # Since this is the second request, you only need to include the row data. + # The name of the stream and protocol buffers DESCRIPTOR is only needed in + # the first request. + request = types.AppendRowsRequest() + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + # Offset must equal the number of rows that were previously sent. + request.offset = 6 + + response_future_2 = append_rows_stream.send(request) + + # Create a batch of rows with STRUCT and ARRAY BigQuery data types. In + # protocol buffers, these correspond to nested messages and repeated + # fields, respectively. + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 13 + row.int64_list.append(1) + row.int64_list.append(2) + row.int64_list.append(3) + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 14 + row.struct_col.sub_int_col = 7 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 15 + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -1 + row.struct_list.append(sub_message) + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -2 + row.struct_list.append(sub_message) + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -3 + row.struct_list.append(sub_message) + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 16 + date_value = datetime.date(2021, 8, 8) + epoch_value = datetime.date(1970, 1, 1) + delta = date_value - epoch_value + row.range_date.start = delta.days + proto_rows.serialized_rows.append(row.SerializeToString()) + + request = types.AppendRowsRequest() + request.offset = 12 + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + # For each request sent, a message is expected in the responses iterable. + # This sample sends 3 requests, therefore expect exactly 3 responses. + response_future_3 = append_rows_stream.send(request) + + # All three requests are in-flight, wait for them to finish being processed + # before finalizing the stream. + print(response_future_1.result()) + print(response_future_2.result()) + print(response_future_3.result()) + + # Shutdown background threads and close the streaming connection. + append_rows_stream.close() + + # A PENDING type stream must be "finalized" before being committed. No new + # records can be written to the stream after this method has been called. + write_client.finalize_write_stream(name=write_stream.name) + + # Commit the stream you created earlier. + batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest() + batch_commit_write_streams_request.parent = parent + batch_commit_write_streams_request.write_streams = [write_stream.name] + write_client.batch_commit_write_streams(batch_commit_write_streams_request) + + print(f"Writes to stream: '{write_stream.name}' have been committed.") + + +# [END bigquerystorage_append_rows_raw_proto2] diff --git a/bigquery_storage/snippets/append_rows_proto2_test.py b/bigquery_storage/snippets/append_rows_proto2_test.py new file mode 100644 index 00000000000..15e5b9d9105 --- /dev/null +++ b/bigquery_storage/snippets/append_rows_proto2_test.py @@ -0,0 +1,128 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import decimal +import pathlib +import random + +from google.cloud import bigquery +import pytest + +from . import append_rows_proto2 + +DIR = pathlib.Path(__file__).parent + + +regions = ["US", "non-US"] + + +@pytest.fixture(params=regions) +def sample_data_table( + request: pytest.FixtureRequest, + bigquery_client: bigquery.Client, + project_id: str, + dataset_id: str, + dataset_id_non_us: str, +) -> str: + dataset = dataset_id + if request.param != "US": + dataset = dataset_id_non_us + schema = bigquery_client.schema_from_json(str(DIR / "sample_data_schema.json")) + table_id = f"append_rows_proto2_{random.randrange(10000)}" + full_table_id = f"{project_id}.{dataset}.{table_id}" + table = bigquery.Table(full_table_id, schema=schema) + table = bigquery_client.create_table(table, exists_ok=True) + yield full_table_id + bigquery_client.delete_table(table, not_found_ok=True) + + +def test_append_rows_proto2( + capsys: pytest.CaptureFixture, + bigquery_client: bigquery.Client, + sample_data_table: str, +) -> None: + project_id, dataset_id, table_id = sample_data_table.split(".") + append_rows_proto2.append_rows_proto2( + project_id=project_id, dataset_id=dataset_id, table_id=table_id + ) + out, _ = capsys.readouterr() + assert "have been committed" in out + + rows = bigquery_client.query( + f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`" + ).result() + row_items = [ + # Convert to sorted tuple of items, omitting NULL values, to make + # searching for expected rows easier. + tuple( + sorted( + item for item in row.items() if item[1] is not None and item[1] != [] + ) + ) + for row in rows + ] + + assert ( + ("bool_col", True), + ("bytes_col", b"Hello, World!"), + ("float64_col", float("+inf")), + ("int64_col", 123), + ("row_num", 1), + ("string_col", "Howdy!"), + ) in row_items + assert (("bool_col", False), ("row_num", 2)) in row_items + assert (("bytes_col", b"See you later!"), ("row_num", 3)) in row_items + assert (("float64_col", 1000000.125), ("row_num", 4)) in row_items + assert (("int64_col", 67000), ("row_num", 5)) in row_items + assert (("row_num", 6), ("string_col", "Auf Wiedersehen!")) in row_items + assert (("date_col", datetime.date(2021, 8, 12)), ("row_num", 7)) in row_items + assert ( + ("datetime_col", datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)), + ("row_num", 8), + ) in row_items + assert ( + ("geography_col", "POINT(-122.347222 47.651111)"), + ("row_num", 9), + ) in row_items + assert ( + ("bignumeric_col", decimal.Decimal("-1.234567891011121314151617181920e+16")), + ("numeric_col", decimal.Decimal("1.23456789101112e+6")), + ("row_num", 10), + ) in row_items + assert ( + ("row_num", 11), + ("time_col", datetime.time(11, 7, 48, 123456)), + ) in row_items + assert ( + ("row_num", 12), + ( + "timestamp_col", + datetime.datetime( + 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc + ), + ), + ) in row_items + assert (("int64_list", [1, 2, 3]), ("row_num", 13)) in row_items + assert ( + ("row_num", 14), + ("struct_col", {"sub_int_col": 7}), + ) in row_items + assert ( + ("row_num", 15), + ( + "struct_list", + [{"sub_int_col": -1}, {"sub_int_col": -2}, {"sub_int_col": -3}], + ), + ) in row_items diff --git a/bigquery_storage/snippets/conftest.py b/bigquery_storage/snippets/conftest.py new file mode 100644 index 00000000000..5f1e958183c --- /dev/null +++ b/bigquery_storage/snippets/conftest.py @@ -0,0 +1,65 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Generator + +from google.cloud import bigquery +import pytest +import test_utils.prefixer + +prefixer = test_utils.prefixer.Prefixer("python-bigquery-storage", "samples/snippets") + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_datasets(bigquery_client: bigquery.Client) -> None: + for dataset in bigquery_client.list_datasets(): + if prefixer.should_cleanup(dataset.dataset_id): + bigquery_client.delete_dataset( + dataset, delete_contents=True, not_found_ok=True + ) + + +@pytest.fixture(scope="session") +def bigquery_client() -> bigquery.Client: + return bigquery.Client() + + +@pytest.fixture(scope="session") +def project_id(bigquery_client: bigquery.Client) -> str: + return bigquery_client.project + + +@pytest.fixture(scope="session") +def dataset_id( + bigquery_client: bigquery.Client, project_id: str +) -> Generator[str, None, None]: + dataset_id = prefixer.create_prefix() + full_dataset_id = f"{project_id}.{dataset_id}" + dataset = bigquery.Dataset(full_dataset_id) + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True) + + +@pytest.fixture(scope="session") +def dataset_id_non_us( + bigquery_client: bigquery.Client, project_id: str +) -> Generator[str, None, None]: + dataset_id = prefixer.create_prefix() + full_dataset_id = f"{project_id}.{dataset_id}" + dataset = bigquery.Dataset(full_dataset_id) + dataset.location = "asia-northeast1" + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True) diff --git a/bigquery_storage/snippets/customer_record.proto b/bigquery_storage/snippets/customer_record.proto new file mode 100644 index 00000000000..6c79336b6fa --- /dev/null +++ b/bigquery_storage/snippets/customer_record.proto @@ -0,0 +1,30 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START bigquerystorage_append_rows_pending_customer_record] +// The BigQuery Storage API expects protocol buffer data to be encoded in the +// proto2 wire format. This allows it to disambiguate missing optional fields +// from default values without the need for wrapper types. +syntax = "proto2"; + +// Define a message type representing the rows in your table. The message +// cannot contain fields which are not present in the table. +message CustomerRecord { + + optional string customer_name = 1; + + // Use the required keyword for client-side validation of required fields. + required int64 row_num = 2; +} +// [END bigquerystorage_append_rows_pending_customer_record] diff --git a/bigquery_storage/snippets/customer_record_pb2.py b/bigquery_storage/snippets/customer_record_pb2.py new file mode 100644 index 00000000000..457ead954d8 --- /dev/null +++ b/bigquery_storage/snippets/customer_record_pb2.py @@ -0,0 +1,51 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: customer_record.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x15\x63ustomer_record.proto"8\n\x0e\x43ustomerRecord\x12\x15\n\rcustomer_name\x18\x01 \x01(\t\x12\x0f\n\x07row_num\x18\x02 \x02(\x03' +) + + +_CUSTOMERRECORD = DESCRIPTOR.message_types_by_name["CustomerRecord"] +CustomerRecord = _reflection.GeneratedProtocolMessageType( + "CustomerRecord", + (_message.Message,), + { + "DESCRIPTOR": _CUSTOMERRECORD, + "__module__": "customer_record_pb2" + # @@protoc_insertion_point(class_scope:CustomerRecord) + }, +) +_sym_db.RegisterMessage(CustomerRecord) + +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _CUSTOMERRECORD._serialized_start = 25 + _CUSTOMERRECORD._serialized_end = 81 +# @@protoc_insertion_point(module_scope) diff --git a/bigquery_storage/snippets/customer_record_schema.json b/bigquery_storage/snippets/customer_record_schema.json new file mode 100644 index 00000000000..e04b31a7ead --- /dev/null +++ b/bigquery_storage/snippets/customer_record_schema.json @@ -0,0 +1,11 @@ +[ + { + "name": "customer_name", + "type": "STRING" + }, + { + "name": "row_num", + "type": "INTEGER", + "mode": "REQUIRED" + } +] diff --git a/bigquery_storage/snippets/noxfile_config.py b/bigquery_storage/snippets/noxfile_config.py new file mode 100644 index 00000000000..f1fa9e5618b --- /dev/null +++ b/bigquery_storage/snippets/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/bigquery_storage/snippets/requirements-test.txt b/bigquery_storage/snippets/requirements-test.txt new file mode 100644 index 00000000000..230ca56dc3a --- /dev/null +++ b/bigquery_storage/snippets/requirements-test.txt @@ -0,0 +1,4 @@ +google-cloud-testutils==1.6.4 +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest==8.4.1; python_version >= '3.9' diff --git a/bigquery_storage/snippets/requirements.txt b/bigquery_storage/snippets/requirements.txt new file mode 100644 index 00000000000..8a456493526 --- /dev/null +++ b/bigquery_storage/snippets/requirements.txt @@ -0,0 +1,6 @@ +google-cloud-bigquery-storage==2.32.0 +google-cloud-bigquery===3.30.0; python_version <= '3.8' +google-cloud-bigquery==3.35.1; python_version >= '3.9' +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest==8.4.1; python_version >= '3.9' diff --git a/bigquery_storage/snippets/sample_data.proto b/bigquery_storage/snippets/sample_data.proto new file mode 100644 index 00000000000..6f0bb93a65c --- /dev/null +++ b/bigquery_storage/snippets/sample_data.proto @@ -0,0 +1,70 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START bigquerystorage_append_rows_raw_proto2_definition] +// The BigQuery Storage API expects protocol buffer data to be encoded in the +// proto2 wire format. This allows it to disambiguate missing optional fields +// from default values without the need for wrapper types. +syntax = "proto2"; + +// Define a message type representing the rows in your table. The message +// cannot contain fields which are not present in the table. +message SampleData { + // Use a nested message to encode STRUCT column values. + // + // References to external messages are not allowed. Any message definitions + // must be nested within the root message representing row data. + message SampleStruct { + optional int64 sub_int_col = 1; + } + + message RangeValue { + optional int32 start = 1; + optional int32 end = 2; + } + + // The following types map directly between protocol buffers and their + // corresponding BigQuery data types. + optional bool bool_col = 1; + optional bytes bytes_col = 2; + optional double float64_col = 3; + optional int64 int64_col = 4; + optional string string_col = 5; + + // The following data types require some encoding to use. See the + // documentation for the expected data formats: + // https://cloud.google.com/bigquery/docs/write-api#data_type_conversion + optional int32 date_col = 6; + optional string datetime_col = 7; + optional string geography_col = 8; + optional string numeric_col = 9; + optional string bignumeric_col = 10; + optional string time_col = 11; + optional int64 timestamp_col = 12; + + // Use a repeated field to represent a BigQuery ARRAY value. + repeated int64 int64_list = 13; + + // Use a nested message to encode STRUCT and ARRAY values. + optional SampleStruct struct_col = 14; + repeated SampleStruct struct_list = 15; + + // Range types, see: + // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type + optional RangeValue range_date = 16; + + // Use the required keyword for client-side validation of required fields. + required int64 row_num = 17; +} +// [END bigquerystorage_append_rows_raw_proto2_definition] diff --git a/bigquery_storage/snippets/sample_data_pb2.py b/bigquery_storage/snippets/sample_data_pb2.py new file mode 100644 index 00000000000..54ef06d99fa --- /dev/null +++ b/bigquery_storage/snippets/sample_data_pb2.py @@ -0,0 +1,43 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: sample_data.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x11sample_data.proto"\xff\x03\n\nSampleData\x12\x10\n\x08\x62ool_col\x18\x01 \x01(\x08\x12\x11\n\tbytes_col\x18\x02 \x01(\x0c\x12\x13\n\x0b\x66loat64_col\x18\x03 \x01(\x01\x12\x11\n\tint64_col\x18\x04 \x01(\x03\x12\x12\n\nstring_col\x18\x05 \x01(\t\x12\x10\n\x08\x64\x61te_col\x18\x06 \x01(\x05\x12\x14\n\x0c\x64\x61tetime_col\x18\x07 \x01(\t\x12\x15\n\rgeography_col\x18\x08 \x01(\t\x12\x13\n\x0bnumeric_col\x18\t \x01(\t\x12\x16\n\x0e\x62ignumeric_col\x18\n \x01(\t\x12\x10\n\x08time_col\x18\x0b \x01(\t\x12\x15\n\rtimestamp_col\x18\x0c \x01(\x03\x12\x12\n\nint64_list\x18\r \x03(\x03\x12,\n\nstruct_col\x18\x0e \x01(\x0b\x32\x18.SampleData.SampleStruct\x12-\n\x0bstruct_list\x18\x0f \x03(\x0b\x32\x18.SampleData.SampleStruct\x12*\n\nrange_date\x18\x10 \x01(\x0b\x32\x16.SampleData.RangeValue\x12\x0f\n\x07row_num\x18\x11 \x02(\x03\x1a#\n\x0cSampleStruct\x12\x13\n\x0bsub_int_col\x18\x01 \x01(\x03\x1a(\n\nRangeValue\x12\r\n\x05start\x18\x01 \x01(\x05\x12\x0b\n\x03\x65nd\x18\x02 \x01(\x05' +) + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "sample_data_pb2", globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _SAMPLEDATA._serialized_start = 22 + _SAMPLEDATA._serialized_end = 533 + _SAMPLEDATA_SAMPLESTRUCT._serialized_start = 456 + _SAMPLEDATA_SAMPLESTRUCT._serialized_end = 491 + _SAMPLEDATA_RANGEVALUE._serialized_start = 493 + _SAMPLEDATA_RANGEVALUE._serialized_end = 533 +# @@protoc_insertion_point(module_scope) diff --git a/bigquery_storage/snippets/sample_data_schema.json b/bigquery_storage/snippets/sample_data_schema.json new file mode 100644 index 00000000000..40efb7122b5 --- /dev/null +++ b/bigquery_storage/snippets/sample_data_schema.json @@ -0,0 +1,81 @@ + +[ + { + "name": "bool_col", + "type": "BOOLEAN" + }, + { + "name": "bytes_col", + "type": "BYTES" + }, + { + "name": "date_col", + "type": "DATE" + }, + { + "name": "datetime_col", + "type": "DATETIME" + }, + { + "name": "float64_col", + "type": "FLOAT" + }, + { + "name": "geography_col", + "type": "GEOGRAPHY" + }, + { + "name": "int64_col", + "type": "INTEGER" + }, + { + "name": "numeric_col", + "type": "NUMERIC" + }, + { + "name": "bignumeric_col", + "type": "BIGNUMERIC" + }, + { + "name": "row_num", + "type": "INTEGER", + "mode": "REQUIRED" + }, + { + "name": "string_col", + "type": "STRING" + }, + { + "name": "time_col", + "type": "TIME" + }, + { + "name": "timestamp_col", + "type": "TIMESTAMP" + }, + { + "name": "int64_list", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "struct_col", + "type": "RECORD", + "fields": [ + {"name": "sub_int_col", "type": "INTEGER"} + ] + }, + { + "name": "struct_list", + "type": "RECORD", + "fields": [ + {"name": "sub_int_col", "type": "INTEGER"} + ], + "mode": "REPEATED" + }, + { + "name": "range_date", + "type": "RANGE", + "rangeElementType": {"type": "DATE"} + } + ] diff --git a/bigquery_storage/to_dataframe/__init__.py b/bigquery_storage/to_dataframe/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bigquery_storage/to_dataframe/jupyter_test.py b/bigquery_storage/to_dataframe/jupyter_test.py new file mode 100644 index 00000000000..c2046b8c80e --- /dev/null +++ b/bigquery_storage/to_dataframe/jupyter_test.py @@ -0,0 +1,67 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import IPython +from IPython.terminal import interactiveshell +from IPython.testing import tools +import pytest + +# Ignore semicolon lint warning because semicolons are used in notebooks +# flake8: noqa E703 + + +@pytest.fixture(scope="session") +def ipython(): + config = tools.default_config() + config.TerminalInteractiveShell.simple_prompt = True + shell = interactiveshell.TerminalInteractiveShell.instance(config=config) + return shell + + +@pytest.fixture() +def ipython_interactive(request, ipython): + """Activate IPython's builtin hooks + + for the duration of the test scope. + """ + with ipython.builtin_trap: + yield ipython + + +def _strip_region_tags(sample_text): + """Remove blank lines and region tags from sample text""" + magic_lines = [ + line for line in sample_text.split("\n") if len(line) > 0 and "# [" not in line + ] + return "\n".join(magic_lines) + + +def test_jupyter_tutorial(ipython): + ip = IPython.get_ipython() + ip.extension_manager.load_extension("google.cloud.bigquery") + + # This code sample intentionally queries a lot of data to demonstrate the + # speed-up of using the BigQuery Storage API to download the results. + sample = """ + # [START bigquerystorage_jupyter_tutorial_query_default] + %%bigquery tax_forms + SELECT * FROM `bigquery-public-data.irs_990.irs_990_2012` + # [END bigquerystorage_jupyter_tutorial_query_default] + """ + result = ip.run_cell(_strip_region_tags(sample)) + result.raise_error() # Throws an exception if the cell failed. + + assert "tax_forms" in ip.user_ns # verify that variable exists diff --git a/bigquery_storage/to_dataframe/noxfile_config.py b/bigquery_storage/to_dataframe/noxfile_config.py new file mode 100644 index 00000000000..f1fa9e5618b --- /dev/null +++ b/bigquery_storage/to_dataframe/noxfile_config.py @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be imported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} diff --git a/bigquery_storage/to_dataframe/read_query_results.py b/bigquery_storage/to_dataframe/read_query_results.py new file mode 100644 index 00000000000..e947e8afe93 --- /dev/null +++ b/bigquery_storage/to_dataframe/read_query_results.py @@ -0,0 +1,49 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas + + +def read_query_results() -> pandas.DataFrame: + # [START bigquerystorage_pandas_tutorial_read_query_results] + from google.cloud import bigquery + + bqclient = bigquery.Client() + + # Download query results. + query_string = """ + SELECT + CONCAT( + 'https://stackoverflow.com/questions/', + CAST(id as STRING)) as url, + view_count + FROM `bigquery-public-data.stackoverflow.posts_questions` + WHERE tags like '%google-bigquery%' + ORDER BY view_count DESC + """ + + dataframe = ( + bqclient.query(query_string) + .result() + .to_dataframe( + # Optionally, explicitly request to use the BigQuery Storage API. As of + # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage + # API is used by default. + create_bqstorage_client=True, + ) + ) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_query_results] + + return dataframe diff --git a/bigquery_storage/to_dataframe/read_query_results_test.py b/bigquery_storage/to_dataframe/read_query_results_test.py new file mode 100644 index 00000000000..b5cb5517401 --- /dev/null +++ b/bigquery_storage/to_dataframe/read_query_results_test.py @@ -0,0 +1,23 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from . import read_query_results + + +def test_read_query_results(capsys: pytest.CaptureFixture) -> None: + read_query_results.read_query_results() + out, _ = capsys.readouterr() + assert "stackoverflow" in out diff --git a/bigquery_storage/to_dataframe/read_table_bigquery.py b/bigquery_storage/to_dataframe/read_table_bigquery.py new file mode 100644 index 00000000000..7a69a64d77d --- /dev/null +++ b/bigquery_storage/to_dataframe/read_table_bigquery.py @@ -0,0 +1,45 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import pandas + + +def read_table() -> pandas.DataFrame: + # [START bigquerystorage_pandas_tutorial_read_table] + from google.cloud import bigquery + + bqclient = bigquery.Client() + + # Download a table. + table = bigquery.TableReference.from_string( + "bigquery-public-data.utility_us.country_code_iso" + ) + rows = bqclient.list_rows( + table, + selected_fields=[ + bigquery.SchemaField("country_name", "STRING"), + bigquery.SchemaField("fips_code", "STRING"), + ], + ) + dataframe = rows.to_dataframe( + # Optionally, explicitly request to use the BigQuery Storage API. As of + # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage + # API is used by default. + create_bqstorage_client=True, + ) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_table] + + return dataframe diff --git a/bigquery_storage/to_dataframe/read_table_bigquery_test.py b/bigquery_storage/to_dataframe/read_table_bigquery_test.py new file mode 100644 index 00000000000..5b45c4d5163 --- /dev/null +++ b/bigquery_storage/to_dataframe/read_table_bigquery_test.py @@ -0,0 +1,23 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from . import read_table_bigquery + + +def test_read_table(capsys: pytest.CaptureFixture) -> None: + read_table_bigquery.read_table() + out, _ = capsys.readouterr() + assert "country_name" in out diff --git a/bigquery_storage/to_dataframe/read_table_bqstorage.py b/bigquery_storage/to_dataframe/read_table_bqstorage.py new file mode 100644 index 00000000000..ce1cd3872ae --- /dev/null +++ b/bigquery_storage/to_dataframe/read_table_bqstorage.py @@ -0,0 +1,74 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pandas as pd + + +def read_table(your_project_id: str) -> pd.DataFrame: + original_your_project_id = your_project_id + # [START bigquerystorage_pandas_tutorial_read_session] + your_project_id = "project-for-read-session" + # [END bigquerystorage_pandas_tutorial_read_session] + your_project_id = original_your_project_id + + # [START bigquerystorage_pandas_tutorial_read_session] + import pandas + + from google.cloud import bigquery_storage + from google.cloud.bigquery_storage import types + + bqstorageclient = bigquery_storage.BigQueryReadClient() + + project_id = "bigquery-public-data" + dataset_id = "new_york_trees" + table_id = "tree_species" + table = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}" + + # Select columns to read with read options. If no read options are + # specified, the whole table is read. + read_options = types.ReadSession.TableReadOptions( + selected_fields=["species_common_name", "fall_color"] + ) + + parent = "projects/{}".format(your_project_id) + + requested_session = types.ReadSession( + table=table, + # Avro is also supported, but the Arrow data format is optimized to + # work well with column-oriented data structures such as pandas + # DataFrames. + data_format=types.DataFormat.ARROW, + read_options=read_options, + ) + read_session = bqstorageclient.create_read_session( + parent=parent, + read_session=requested_session, + max_stream_count=1, + ) + + # This example reads from only a single stream. Read from multiple streams + # to fetch data faster. Note that the session may not contain any streams + # if there are no rows to read. + stream = read_session.streams[0] + reader = bqstorageclient.read_rows(stream.name) + + # Parse all Arrow blocks and create a dataframe. + frames = [] + for message in reader.rows().pages: + frames.append(message.to_dataframe()) + dataframe = pandas.concat(frames) + print(dataframe.head()) + # [END bigquerystorage_pandas_tutorial_read_session] + + return dataframe diff --git a/bigquery_storage/to_dataframe/read_table_bqstorage_test.py b/bigquery_storage/to_dataframe/read_table_bqstorage_test.py new file mode 100644 index 00000000000..7b46a6b180a --- /dev/null +++ b/bigquery_storage/to_dataframe/read_table_bqstorage_test.py @@ -0,0 +1,23 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from . import read_table_bqstorage + + +def test_read_table(capsys: pytest.CaptureFixture, project_id: str) -> None: + read_table_bqstorage.read_table(your_project_id=project_id) + out, _ = capsys.readouterr() + assert "species_common_name" in out diff --git a/bigquery_storage/to_dataframe/requirements-test.txt b/bigquery_storage/to_dataframe/requirements-test.txt new file mode 100644 index 00000000000..7561ed55ce2 --- /dev/null +++ b/bigquery_storage/to_dataframe/requirements-test.txt @@ -0,0 +1,3 @@ +pytest===7.4.3; python_version == '3.7' +pytest===8.3.5; python_version == '3.8' +pytest==8.4.1; python_version >= '3.9' diff --git a/bigquery_storage/to_dataframe/requirements.txt b/bigquery_storage/to_dataframe/requirements.txt new file mode 100644 index 00000000000..e3b75fdaf5f --- /dev/null +++ b/bigquery_storage/to_dataframe/requirements.txt @@ -0,0 +1,19 @@ +google-auth==2.40.3 +google-cloud-bigquery-storage==2.32.0 +google-cloud-bigquery===3.30.0; python_version <= '3.8' +google-cloud-bigquery==3.35.1; python_version >= '3.9' +pyarrow===12.0.1; python_version == '3.7' +pyarrow===17.0.0; python_version == '3.8' +pyarrow==21.0.0; python_version >= '3.9' +ipython===7.31.1; python_version == '3.7' +ipython===8.10.0; python_version == '3.8' +ipython===8.18.1; python_version == '3.9' +ipython===8.33.0; python_version == '3.10' +ipython==9.4.0; python_version >= '3.11' +ipywidgets==8.1.7 +pandas===1.3.5; python_version == '3.7' +pandas===2.0.3; python_version == '3.8' +pandas==2.3.1; python_version >= '3.9' +tqdm==4.67.1 +db-dtypes===1.4.2; python_version <= '3.8' +db-dtypes==1.4.3; python_version >= '3.9'