-
Notifications
You must be signed in to change notification settings - Fork 49
feat: Support write api as loading option #1617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e564382
cb524ca
d0d9899
dc6b7a8
3d2d83e
b46ff45
5f22def
0221e00
dc7c71f
8dcaccb
8b582ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -255,6 +255,7 @@ def __init__( | |||
session=self, | ||||
bqclient=self._clients_provider.bqclient, | ||||
storage_manager=self._temp_storage_manager, | ||||
write_client=self._clients_provider.bqstoragewriteclient, | ||||
default_index_type=self._default_index_type, | ||||
scan_index_uniqueness=self._strictly_ordered, | ||||
force_total_order=self._strictly_ordered, | ||||
|
@@ -731,7 +732,9 @@ def read_pandas( | |||
workload is such that you exhaust the BigQuery load job | ||||
quota and your data cannot be embedded in SQL due to size or | ||||
data type limitations. | ||||
|
||||
* "bigquery_write": | ||||
[Preview] Use the BigQuery Storage Write API. This feature | ||||
is in public preview. | ||||
Returns: | ||||
An equivalent bigframes.pandas.(DataFrame/Series/Index) object | ||||
|
||||
|
@@ -805,6 +808,10 @@ def _read_pandas( | |||
return self._loader.read_pandas( | ||||
pandas_dataframe, method="stream", api_name=api_name | ||||
) | ||||
elif write_engine == "bigquery_write": | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you also add this to the various docstrings? Let's mark the "bigquery_write" option as [Preview] in the docs, too.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added to the only docstring that enumerates the engines |
||||
return self._loader.read_pandas( | ||||
pandas_dataframe, method="write", api_name=api_name | ||||
) | ||||
else: | ||||
raise ValueError(f"Got unexpected write_engine '{write_engine}'") | ||||
|
||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import typing | ||
from typing import ( | ||
Dict, | ||
Generator, | ||
Hashable, | ||
IO, | ||
Iterable, | ||
|
@@ -36,12 +37,13 @@ | |
import bigframes_vendored.constants as constants | ||
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq | ||
import google.api_core.exceptions | ||
from google.cloud import bigquery_storage_v1 | ||
import google.cloud.bigquery as bigquery | ||
import google.cloud.bigquery.table | ||
from google.cloud.bigquery_storage_v1 import types as bq_storage_types | ||
import pandas | ||
import pyarrow as pa | ||
|
||
from bigframes.core import local_data, utils | ||
from bigframes.core import guid, local_data, utils | ||
import bigframes.core as core | ||
import bigframes.core.blocks as blocks | ||
import bigframes.core.schema as schemata | ||
|
@@ -142,13 +144,15 @@ def __init__( | |
self, | ||
session: bigframes.session.Session, | ||
bqclient: bigquery.Client, | ||
write_client: bigquery_storage_v1.BigQueryWriteClient, | ||
storage_manager: bigframes.session.temporary_storage.TemporaryStorageManager, | ||
default_index_type: bigframes.enums.DefaultIndexKind, | ||
scan_index_uniqueness: bool, | ||
force_total_order: bool, | ||
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, | ||
): | ||
self._bqclient = bqclient | ||
self._write_client = write_client | ||
self._storage_manager = storage_manager | ||
self._default_index_type = default_index_type | ||
self._scan_index_uniqueness = scan_index_uniqueness | ||
|
@@ -165,7 +169,7 @@ def __init__( | |
def read_pandas( | ||
self, | ||
pandas_dataframe: pandas.DataFrame, | ||
method: Literal["load", "stream"], | ||
method: Literal["load", "stream", "write"], | ||
api_name: str, | ||
) -> dataframe.DataFrame: | ||
# TODO: Push this into from_pandas, along with index flag | ||
|
@@ -183,6 +187,8 @@ def read_pandas( | |
array_value = self.load_data(managed_data, api_name=api_name) | ||
elif method == "stream": | ||
array_value = self.stream_data(managed_data) | ||
elif method == "write": | ||
array_value = self.write_data(managed_data) | ||
else: | ||
raise ValueError(f"Unsupported read method {method}") | ||
|
||
|
@@ -198,7 +204,7 @@ def load_data( | |
self, data: local_data.ManagedArrowTable, api_name: Optional[str] = None | ||
) -> core.ArrayValue: | ||
"""Load managed data into bigquery""" | ||
ordering_col = "bf_load_job_offsets" | ||
ordering_col = guid.generate_guid("load_offsets_") | ||
|
||
# JSON support incomplete | ||
for item in data.schema.items: | ||
|
@@ -244,7 +250,7 @@ def load_data( | |
|
||
def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: | ||
"""Load managed data into bigquery""" | ||
ordering_col = "bf_stream_job_offsets" | ||
ordering_col = guid.generate_guid("stream_offsets_") | ||
schema_w_offsets = data.schema.append( | ||
schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) | ||
) | ||
|
@@ -277,6 +283,61 @@ def stream_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: | |
n_rows=data.data.num_rows, | ||
).drop_columns([ordering_col]) | ||
|
||
def write_data(self, data: local_data.ManagedArrowTable) -> core.ArrayValue: | ||
"""Load managed data into bigquery""" | ||
ordering_col = guid.generate_guid("stream_offsets_") | ||
schema_w_offsets = data.schema.append( | ||
schemata.SchemaItem(ordering_col, bigframes.dtypes.INT_DTYPE) | ||
) | ||
bq_schema = schema_w_offsets.to_bigquery(_STREAM_JOB_TYPE_OVERRIDES) | ||
bq_table_ref = self._storage_manager.create_temp_table( | ||
bq_schema, [ordering_col] | ||
) | ||
|
||
requested_stream = bq_storage_types.stream.WriteStream() | ||
requested_stream.type_ = bq_storage_types.stream.WriteStream.Type.COMMITTED # type: ignore | ||
|
||
stream_request = bq_storage_types.CreateWriteStreamRequest( | ||
parent=bq_table_ref.to_bqstorage(), write_stream=requested_stream | ||
) | ||
stream = self._write_client.create_write_stream(request=stream_request) | ||
|
||
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]: | ||
schema, batches = data.to_arrow( | ||
offsets_col=ordering_col, duration_type="int" | ||
) | ||
offset = 0 | ||
for batch in batches: | ||
request = bq_storage_types.AppendRowsRequest( | ||
write_stream=stream.name, offset=offset | ||
) | ||
request.arrow_rows.writer_schema.serialized_schema = ( | ||
schema.serialize().to_pybytes() | ||
) | ||
request.arrow_rows.rows.serialized_record_batch = ( | ||
batch.serialize().to_pybytes() | ||
) | ||
offset += batch.num_rows | ||
yield request | ||
|
||
for response in self._write_client.append_rows(requests=request_gen()): | ||
if response.row_errors: | ||
raise ValueError( | ||
f"Problem loading at least one row from DataFrame: {response.row_errors}. {constants.FEEDBACK_LINK}" | ||
) | ||
# This step isn't strictly necessary in COMMITTED mode, but avoids max active stream limits | ||
response = self._write_client.finalize_write_stream(name=stream.name) | ||
assert response.row_count == data.data.num_rows | ||
|
||
destination_table = self._bqclient.get_table(bq_table_ref) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to do something here to finalize the stream? https://cloud.google.com/bigquery/docs/write-api-streaming There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not strictly necessary, but can avoid limits, per docs: "This step is optional in committed type, but helps to prevent exceeding the limit on active streams" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added finalize in new iteration |
||
return core.ArrayValue.from_table( | ||
table=destination_table, | ||
schema=schema_w_offsets, | ||
session=self._session, | ||
offsets_col=ordering_col, | ||
n_rows=data.data.num_rows, | ||
).drop_columns([ordering_col]) | ||
|
||
def _start_generic_job(self, job: formatting_helpers.GenericJob): | ||
if bigframes.options.display.progress_bar is not None: | ||
formatting_helpers.wait_for_job( | ||
|
Uh oh!
There was an error while loading. Please reload this page.