From 2d3fd4b4dabc1141d65b747edee54db03218c37b Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Tue, 29 Oct 2024 16:22:56 -0700 Subject: [PATCH 01/20] chore: test fix. (#1121) --- tests/system/small/test_session.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index 4b48915d2d..580ae80ef1 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -399,7 +399,7 @@ def test_read_gbq_twice_with_same_timestamp(session, penguins_table_id): ) def test_read_gbq_on_linked_dataset_warns(session, source_table): with warnings.catch_warnings(record=True) as warned: - session.read_gbq(source_table) + session.read_gbq(source_table, use_cache=False) assert len(warned) == 1 assert warned[0].category == bigframes.exceptions.TimeTravelDisabledWarning From 108f4a98463596d8df6d381b3580eb72eab41b6e Mon Sep 17 00:00:00 2001 From: rey-esp Date: Wed, 30 Oct 2024 11:41:36 -0500 Subject: [PATCH 02/20] docs: add snippet for Linear Regression tutorial Predict Outcomes section (#1101) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs: add python translation for predicting outcomes * add '6' to previous snippet comment * fix data to predict * add expected results * replace model with dataframe * update dataframe to drop nulls in body mass column * update df * Update samples/snippets/linear_regression_tutorial_test.py * complete snippet --------- Co-authored-by: Tim Sweña (Swast) --- .../linear_regression_tutorial_test.py | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/samples/snippets/linear_regression_tutorial_test.py b/samples/snippets/linear_regression_tutorial_test.py index 9a4908dbf5..452d88746d 100644 --- a/samples/snippets/linear_regression_tutorial_test.py +++ b/samples/snippets/linear_regression_tutorial_test.py @@ -52,9 +52,34 @@ def test_linear_regression(random_model_id: str) -> None: # Expected output results: # index mean_absolute_error mean_squared_error mean_squared_log_error median_absolute_error r2_score explained_variance # 0 227.012237 81838.159892 0.00507 173.080816 0.872377 0.872377 - # 1 rows x columns + # 1 rows x 6 columns # [END bigquery_dataframes_bqml_linear_evaluate] + # [START bigquery_dataframes_bqml_linear_predict] + # Select the model you'll use for predictions. `read_gbq_model` loads + # model data from BigQuery, but you could also use the `model` object + # object from previous steps. + model = bpd.read_gbq_model( + your_model_id, + # For example: "bqml_tutorial.penguins_model", + ) + + # Load data from BigQuery + bq_df = bpd.read_gbq("bigquery-public-data.ml_datasets.penguins") + + # Use 'contains' function to filter by island containing the string + # "Biscoe". + biscoe_data = bq_df.loc[bq_df["island"].str.contains("Biscoe")] + + result = model.predict(biscoe_data) + + # Expected output results: + # predicted_body_mass_g species island culmen_length_mm culmen_depth_mm body_mass_g flipper_length_mm sex + # 23 4681.782896 Gentoo penguin (Pygoscelis papua) Biscoe + # 332 4740.7907 Gentoo penguin (Pygoscelis papua) Biscoe 46.2 14.4 214.0 4650.0 + # 160 4731.310452 Gentoo penguin (Pygoscelis papua) Biscoe 44.5 14.3 216.0 4100.0 + # [END bigquery_dataframes_bqml_linear_predict] assert feature_columns is not None assert label_columns is not None assert model is not None assert score is not None + assert result is not None From 9867a788e7c46bf0850cacbe7cd41a11fea32d6b Mon Sep 17 00:00:00 2001 From: Arwa Sharif <146148342+arwas11@users.noreply.github.com> Date: Wed, 30 Oct 2024 11:58:34 -0500 Subject: [PATCH 03/20] docs: update GroupBy docstrings (#1103) --- .../pandas/core/groupby/__init__.py | 86 ++++++++++++------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 6011dbfe5b..54320d8116 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -49,7 +49,8 @@ def any(self): [2 rows x 2 columns] Returns: - Series or DataFrame: DataFrame or Series of boolean values, + bigframes.pandas.DataFrame or bigframes.pandas.Series: + DataFrame or Series of boolean values, where a value is True if any element is True within its respective group; otherwise False. """ @@ -87,7 +88,8 @@ def all(self): [2 rows x 2 columns] Returns: - Series or DataFrame: DataFrame or Series of boolean values, + bigframes.pandas.DataFrame or bigframes.pandas.Series: + DataFrame or Series of boolean values, where a value is True if all elements are True within its respective group; otherwise False. """ @@ -126,7 +128,8 @@ def count(self): [2 rows x 2 columns] Returns: - Series or DataFrame: Count of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Count of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -181,7 +184,8 @@ def mean( Include only float, int, boolean columns. Returns: - pandas.Series or pandas.DataFrame: Mean of groups. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Mean of groups. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -228,7 +232,8 @@ def median( Calculate the exact median instead of an approximation. Returns: - pandas.Series or pandas.DataFrame: Median of groups. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Median of groups. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -259,7 +264,8 @@ def quantile(self, q=0.5, *, numeric_only: bool = False): Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame: Return type determined by caller of GroupBy object. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Return type determined by caller of GroupBy object. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -305,7 +311,8 @@ def std( Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame: Standard deviation of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Standard deviation of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -351,7 +358,7 @@ def var( Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame + bigframes.pandas.DataFrame or bigframes.pandas.Series: Variance of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -388,7 +395,7 @@ def skew( Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame + bigframes.pandas.DataFrame or bigframes.pandas.Series: Variance of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -421,7 +428,7 @@ def kurt( Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame + bigframes.pandas.DataFrame or bigframes.pandas.Series: Variance of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -454,7 +461,7 @@ def kurtosis( Include only `float`, `int` or `boolean` data. Returns: - Series or DataFrame + bigframes.pandas.DataFrame or bigframes.pandas.Series: Variance of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -502,7 +509,8 @@ def sum( than ``min_count`` and non-NA values are present, the result will be NA. Returns: - Series or DataFrame: Computed sum of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Computed sum of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -534,7 +542,8 @@ def prod(self, numeric_only: bool = False, min_count: int = 0): than ``min_count`` and non-NA values are present, the result will be NA. Returns: - Series or DataFrame: Computed prod of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Computed prod of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -582,7 +591,8 @@ def min( than ``min_count`` and non-NA values are present, the result will be NA. Returns: - Series or DataFrame: Computed min of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Computed min of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -629,7 +639,8 @@ def max( than ``min_count`` and non-NA values are present, the result will be NA. Returns: - Series or DataFrame: Computed max of values within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Computed max of values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -667,7 +678,8 @@ def cumcount(self, ascending: bool = True): If False, number in reverse, from length of group - 1 to 0. Returns: - Series: Sequence number of each element within each group. + bigframes.pandas.Series: + Sequence number of each element within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -705,7 +717,8 @@ def cumprod(self, *args, **kwargs): [3 rows x 2 columns] Returns: - Series or DataFrame: Cumulative product for each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Cumulative product for each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -743,7 +756,8 @@ def cumsum(self, *args, **kwargs): [3 rows x 2 columns] Returns: - Series or DataFrame: Cumulative sum for each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Cumulative sum for each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -781,7 +795,8 @@ def cummin(self, *args, numeric_only: bool = False, **kwargs): [3 rows x 2 columns] Returns: - Series or DataFrame: Cumulative min for each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Cumulative min for each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -819,7 +834,8 @@ def cummax(self, *args, numeric_only: bool = False, **kwargs): [3 rows x 2 columns] Returns: - Series or DataFrame: Cumulative max for each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Cumulative max for each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -866,7 +882,8 @@ def diff(self): [7 rows x 2 columns] Returns: - Series or DataFrame: First differences. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + First differences. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -910,7 +927,8 @@ def shift(self, periods: int = 1): Number of periods to shift. Returns: - Series or DataFrame: Object shifted within each group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Object shifted within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -947,7 +965,8 @@ def rolling(self, *args, **kwargs): to the size of the window. Returns: - Series or DataFrame: Return a new grouper with our rolling appended. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + Return a new grouper with our rolling appended. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -973,7 +992,8 @@ def expanding(self, *args, **kwargs): dtype: Int64 Returns: - Series or DataFrame: An expanding grouper, providing expanding functionality per group. + bigframes.pandas.DataFrame or bigframes.pandas.Series: + An expanding grouper, providing expanding functionality per group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1007,7 +1027,8 @@ def agg(self, func): - list of function names, e.g. ``['sum', 'mean']`` Returns: - Series or DataFrame + bigframes.pandas.Series: + A BigQuery Series. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1039,7 +1060,8 @@ def aggregate(self, func): - list of function names, e.g. ``['sum', 'mean']`` Returns: - Series or DataFrame + bigframes.pandas.Series: + A BigQuery Series. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1061,7 +1083,8 @@ def nunique(self): dtype: Int64 Returns: - Series: Number of unique values within each group. + bigframes.pandas.Series: + Number of unique values within each group. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1122,7 +1145,8 @@ def agg(self, func, **kwargs): aggregations via Named Aggregation. See ``func`` entry. Returns: - DataFrame + bigframes.pandas.DataFrame: + A BigQuery DataFrame. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1181,7 +1205,8 @@ def aggregate(self, func, **kwargs): aggregations via Named Aggregation. See ``func`` entry. Returns: - DataFrame + bigframes.pandas.DataFrame: + A BigQuery DataFrame. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1209,6 +1234,7 @@ def nunique(self): [3 rows x 2 columns] Returns: - DataFrame + bigframes.pandas.DataFrame: + Number of unique values within a BigQuery DataFrame. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 14e32b51c11c1718128f49ef94e754afc0ac0618 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Wed, 30 Oct 2024 11:40:00 -0700 Subject: [PATCH 04/20] fix: Fix Series.to_frame generating string label instead of int where name is None (#1118) --- bigframes/core/indexers.py | 4 ++-- bigframes/series.py | 2 +- bigframes/session/__init__.py | 2 +- tests/system/small/test_series.py | 9 +++++++++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index 6c65077528..f03011c054 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -375,7 +375,7 @@ def _perform_loc_list_join( if isinstance(series_or_dataframe, bigframes.series.Series): _struct_accessor_check_and_warn(series_or_dataframe, keys_index) original_name = series_or_dataframe.name - name = series_or_dataframe.name if series_or_dataframe.name is not None else "0" + name = series_or_dataframe.name if series_or_dataframe.name is not None else 0 result = typing.cast( bigframes.series.Series, series_or_dataframe.to_frame()._perform_join_by_index( @@ -468,7 +468,7 @@ def _iloc_getitem_series_or_dataframe( if isinstance(series_or_dataframe, bigframes.series.Series): original_series_name = series_or_dataframe.name series_name = ( - original_series_name if original_series_name is not None else "0" + original_series_name if original_series_name is not None else 0 ) df = series_or_dataframe.to_frame() original_index_names = df.index.names diff --git a/bigframes/series.py b/bigframes/series.py index d311742861..571eee1534 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1651,7 +1651,7 @@ def to_frame(self, name: blocks.Label = None) -> bigframes.dataframe.DataFrame: provided_name = name if name else self.name # To be consistent with Pandas, it assigns 0 as the column name if missing. 0 is the first element of RangeIndex. block = self._block.with_column_labels( - [provided_name] if provided_name else ["0"] + [provided_name] if provided_name else [0] ) return bigframes.dataframe.DataFrame(block) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f2f41b8463..004f5d322b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -922,7 +922,7 @@ def read_pickle( if isinstance(pandas_obj, pandas.Series): if pandas_obj.name is None: - pandas_obj.name = "0" + pandas_obj.name = 0 bigframes_df = self._read_pandas(pandas_obj.to_frame(), "read_pickle") return bigframes_df[bigframes_df.columns[0]] return self._read_pandas(pandas_obj, "read_pickle") diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index c29f91bc5c..696501d1b9 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -2790,6 +2790,15 @@ def test_to_frame(scalars_dfs): assert_pandas_df_equal(bf_result, pd_result) +def test_to_frame_no_name(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_col"].rename(None).to_frame().to_pandas() + pd_result = scalars_pandas_df["int64_col"].rename(None).to_frame() + + assert_pandas_df_equal(bf_result, pd_result) + + def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index): path = gcs_folder + "test_series_to_json*.jsonl" scalars_df_index["int64_col"].to_json(path, lines=True, orient="records") From cc792d2d8036eb5b892033f299cb332a1fbf41dc Mon Sep 17 00:00:00 2001 From: jialuoo Date: Wed, 30 Oct 2024 12:26:46 -0700 Subject: [PATCH 05/20] refactor: sort the region and location in constants.py (#1124) --- bigframes/constants.py | 61 ++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 32 deletions(-) diff --git a/bigframes/constants.py b/bigframes/constants.py index f344c6ecee..dbc24401a7 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -24,55 +24,52 @@ # https://cloud.google.com/bigquery/docs/locations BIGQUERY_REGIONS = frozenset( { - "us-east5", - "us-south1", - "us-central1", - "us-west4", - "us-west2", - "northamerica-northeast1", - "us-east4", - "us-west1", - "us-west3", - "southamerica-east1", - "southamerica-west1", - "us-east1", - "northamerica-northeast2", - "asia-south2", + "africa-south1", + "asia-east1", "asia-east2", - "asia-southeast2", - "australia-southeast2", - "asia-south1", + "asia-northeast1", "asia-northeast2", "asia-northeast3", + "asia-south1", + "asia-south2", "asia-southeast1", + "asia-southeast2", "australia-southeast1", - "asia-east1", - "asia-northeast1", + "australia-southeast2", + "europe-central2", + "europe-north1", + "europe-southwest1", "europe-west1", "europe-west10", - "europe-north1", - "europe-west3", + "europe-west12", "europe-west2", - "europe-southwest1", - "europe-west8", + "europe-west3", "europe-west4", - "europe-west9", - "europe-west12", - "europe-central2", "europe-west6", - "me-central2", + "europe-west8", + "europe-west9", "me-central1", - "me-west1", "me-central2", - "me-central1", "me-west1", - "africa-south1", + "northamerica-northeast1", + "northamerica-northeast2", + "southamerica-east1", + "southamerica-west1", + "us-central1", + "us-east1", + "us-east4", + "us-east5", + "us-south1", + "us-west1", + "us-west2", + "us-west3", + "us-west4", } ) BIGQUERY_MULTIREGIONS = frozenset( { - "US", "EU", + "US", } ) ALL_BIGQUERY_LOCATIONS = frozenset(BIGQUERY_REGIONS.union(BIGQUERY_MULTIREGIONS)) @@ -81,8 +78,8 @@ REP_ENABLED_BIGQUERY_LOCATIONS = frozenset( { "europe-west3", - "europe-west9", "europe-west8", + "europe-west9", "me-central2", "us-central1", "us-central2", From 72c228b15627e6047d60ae42740563a6dfea73da Mon Sep 17 00:00:00 2001 From: jialuoo Date: Wed, 30 Oct 2024 12:45:39 -0700 Subject: [PATCH 06/20] fix: update the API documentation with newly added rep (#1120) * fix: update the API documentaton with newly added rep * fix the order --- bigframes/_config/bigquery_options.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 2fdd7d6feb..90f03c83cd 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -235,8 +235,10 @@ def use_regional_endpoints(self) -> bool: .. note:: Use of regional endpoints is a feature in Preview and available only - in regions "europe-west3", "europe-west9", "europe-west8", - "me-central2", "us-east4" and "us-west1". + in regions "europe-west3", "europe-west8", "europe-west9", + "me-central2", "us-central1", "us-central2", "us-east1", "us-east4", + "us-east5", "us-east7", "us-south1", "us-west1", "us-west2", "us-west3" + and "us-west4". .. deprecated:: 0.13.0 Use of locational endpoints is available only in selected projects. From 3759c6397eaa3c46c4142aa51ca22be3dc8e4971 Mon Sep 17 00:00:00 2001 From: rey-esp Date: Thu, 31 Oct 2024 12:12:58 -0500 Subject: [PATCH 07/20] feat: add basic geopandas functionality (#962) * feat: add basic geopandas functionality * update examples for geoseries * feat: add Series.geo helper to convert Series to a GeoSeries * fix cirucular import * Added a constructor * add documentation for geoseries * remove GeoSeries.x and GeoSeries.y temporarily --------- Co-authored-by: Arwa --- bigframes/geopandas/__init__.py | 17 +++++++++++ bigframes/geopandas/geoseries.py | 28 +++++++++++++++++ bigframes/series.py | 17 +++++++++++ .../bigframes.geopandas/geoseries.rst | 17 +++++++++++ docs/reference/bigframes.geopandas/index.rst | 9 ++++++ docs/reference/index.rst | 5 ++-- .../bigframes_vendored/geopandas/LICENSE.txt | 25 ++++++++++++++++ .../bigframes_vendored/geopandas/geoseries.py | 30 +++++++++++++++++++ 8 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 bigframes/geopandas/__init__.py create mode 100644 bigframes/geopandas/geoseries.py create mode 100644 docs/reference/bigframes.geopandas/geoseries.rst create mode 100644 docs/reference/bigframes.geopandas/index.rst create mode 100644 third_party/bigframes_vendored/geopandas/LICENSE.txt create mode 100644 third_party/bigframes_vendored/geopandas/geoseries.py diff --git a/bigframes/geopandas/__init__.py b/bigframes/geopandas/__init__.py new file mode 100644 index 0000000000..08966ba923 --- /dev/null +++ b/bigframes/geopandas/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bigframes.geopandas.geoseries import GeoSeries + +__all__ = ["GeoSeries"] diff --git a/bigframes/geopandas/geoseries.py b/bigframes/geopandas/geoseries.py new file mode 100644 index 0000000000..959934e2c3 --- /dev/null +++ b/bigframes/geopandas/geoseries.py @@ -0,0 +1,28 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import bigframes_vendored.geopandas.geoseries as vendored_geoseries +import geopandas.array # type: ignore + +import bigframes.series + + +class GeoSeries(vendored_geoseries.GeoSeries, bigframes.series.Series): + __doc__ = vendored_geoseries.GeoSeries.__doc__ + + def __init__(self, data=None, index=None, **kwargs): + super().__init__( + data=data, index=index, dtype=geopandas.array.GeometryDtype(), **kwargs + ) diff --git a/bigframes/series.py b/bigframes/series.py index 571eee1534..1d44cdd963 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -60,6 +60,9 @@ import bigframes.operations.strings as strings import bigframes.operations.structs as structs +if typing.TYPE_CHECKING: + import bigframes.geopandas.geoseries + LevelType = typing.Union[str, int] LevelsType = typing.Union[LevelType, typing.Sequence[LevelType]] @@ -91,6 +94,20 @@ def dtype(self): def dtypes(self): return self._dtype + @property + def geo(self) -> bigframes.geopandas.geoseries.GeoSeries: + """ + Accessor object for geography properties of the Series values. + + Returns: + bigframes.geopandas.geoseries.GeoSeries: + An accessor containing geography methods. + + """ + import bigframes.geopandas.geoseries + + return bigframes.geopandas.geoseries.GeoSeries(self) + @property @validations.requires_index def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: diff --git a/docs/reference/bigframes.geopandas/geoseries.rst b/docs/reference/bigframes.geopandas/geoseries.rst new file mode 100644 index 0000000000..1819613955 --- /dev/null +++ b/docs/reference/bigframes.geopandas/geoseries.rst @@ -0,0 +1,17 @@ + +========= +GeoSeries +========= + +.. contents:: Table of Contents + :depth: 2 + :local: + :backlinks: none + +Series +------ + +.. autoclass:: bigframes.geopandas.geoseries.GeoSeries + :members: + :inherited-members: + :undoc-members: diff --git a/docs/reference/bigframes.geopandas/index.rst b/docs/reference/bigframes.geopandas/index.rst new file mode 100644 index 0000000000..e33946461c --- /dev/null +++ b/docs/reference/bigframes.geopandas/index.rst @@ -0,0 +1,9 @@ + +=============================== +BigQuery DataFrames (geopandas) +=============================== + +.. toctree:: + :maxdepth: 2 + + geoseries diff --git a/docs/reference/index.rst b/docs/reference/index.rst index eb5a774b29..a0f96f751a 100644 --- a/docs/reference/index.rst +++ b/docs/reference/index.rst @@ -8,7 +8,8 @@ packages. :maxdepth: 2 bigframes/index - bigframes.pandas/index - bigframes.ml/index bigframes.bigquery/index + bigframes.geopandas/index + bigframes.ml/index + bigframes.pandas/index bigframes.streaming/index diff --git a/third_party/bigframes_vendored/geopandas/LICENSE.txt b/third_party/bigframes_vendored/geopandas/LICENSE.txt new file mode 100644 index 0000000000..028603be20 --- /dev/null +++ b/third_party/bigframes_vendored/geopandas/LICENSE.txt @@ -0,0 +1,25 @@ +Copyright (c) 2013-2022, GeoPandas developers. +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + * Neither the name of GeoPandas nor the names of its contributors may + be used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/third_party/bigframes_vendored/geopandas/geoseries.py b/third_party/bigframes_vendored/geopandas/geoseries.py new file mode 100644 index 0000000000..81d4e94600 --- /dev/null +++ b/third_party/bigframes_vendored/geopandas/geoseries.py @@ -0,0 +1,30 @@ +# contains code from https://github.com/geopandas/geopandas/blob/main/geopandas/geoseries.py +from __future__ import annotations + + +class GeoSeries: + """ + A Series object designed to store geometry objects. + + **Examples:** + + >>> import bigframes.geopandas + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> from shapely.geometry import Point + >>> s = bigframes.geopandas.GeoSeries([Point(1, 1), Point(2, 2), Point(3, 3)]) + >>> s + 0 POINT (1 1) + 1 POINT (2 2) + 2 POINT (3 3) + dtype: geometry + + Args: + data (array-like, dict, scalar value, bigframes.pandas.Series): + The geometries to store in the GeoSeries. + index (array-like, pandas.Index, bigframes.pandas.Index): + The index for the GeoSeries. + kwargs (dict): + Additional arguments passed to the Series constructor, + e.g. ``name``. + """ From 32274b130849b37d7e587643cf7b6d109455ff38 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Thu, 31 Oct 2024 10:39:08 -0700 Subject: [PATCH 08/20] perf: Reduce CURRENT_TIMESTAMP queries (#1114) * perf: Reduce CURRENT_TIMESTAMP queries * fix unit tests * add unit tests with freezegun --- .../session/_io/bigquery/read_gbq_table.py | 18 +---- bigframes/session/loader.py | 5 +- bigframes/session/time.py | 59 ++++++++++++++++ noxfile.py | 1 + tests/unit/resources.py | 9 +++ tests/unit/session/test_time.py | 69 +++++++++++++++++++ 6 files changed, 144 insertions(+), 17 deletions(-) create mode 100644 bigframes/session/time.py create mode 100644 tests/unit/session/test_time.py diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 01ff1a3f15..f1d2b8f517 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -45,8 +45,8 @@ def get_table_metadata( bqclient: bigquery.Client, table_ref: google.cloud.bigquery.table.TableReference, + bq_time: datetime.datetime, *, - api_name: str, cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], use_cache: bool = True, ) -> Tuple[datetime.datetime, google.cloud.bigquery.table.Table]: @@ -76,23 +76,9 @@ def get_table_metadata( ) return cached_table - # TODO(swast): It's possible that the table metadata is changed between now - # and when we run the CURRENT_TIMESTAMP() query to see when we can time - # travel to. Find a way to fetch the table metadata and BQ's current time - # atomically. table = bqclient.get_table(table_ref) - # TODO(swast): Use session._start_query instead? - # TODO(swast): Use query_and_wait since we know these are small results. - job_config = bigquery.QueryJobConfig() - bigframes.session._io.bigquery.add_labels(job_config, api_name=api_name) - snapshot_timestamp = list( - bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - cached_table = (snapshot_timestamp, table) + cached_table = (bq_time, table) cache[table_ref] = cached_table return cached_table diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 21d454d72f..2a6b59fa4b 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -59,6 +59,7 @@ import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temp_storage +import bigframes.session.time as session_time import bigframes.version # Avoid circular imports. @@ -128,6 +129,8 @@ def __init__( self._metrics = metrics # Unfortunate circular reference, but need to pass reference when constructing objects self._session = session + self._clock = session_time.BigQuerySyncedClock(bqclient) + self._clock.sync() def read_pandas_load_job( self, pandas_dataframe: pandas.DataFrame, api_name: str @@ -246,7 +249,7 @@ def read_gbq_table( time_travel_timestamp, table = bf_read_gbq_table.get_table_metadata( self._bqclient, table_ref=table_ref, - api_name=api_name, + bq_time=self._clock.get_time(), cache=self._df_snapshot, use_cache=use_cache, ) diff --git a/bigframes/session/time.py b/bigframes/session/time.py new file mode 100644 index 0000000000..bef4bbc17f --- /dev/null +++ b/bigframes/session/time.py @@ -0,0 +1,59 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import threading +import time +from typing import cast, Optional + +import google.cloud.bigquery as bigquery + +MIN_RESYNC_SECONDS = 100 + + +class BigQuerySyncedClock: + """ + Local clock that attempts to synchronize its time with the bigquery service. + """ + + def __init__(self, bqclient: bigquery.Client): + self._bqclient = bqclient + self._sync_lock = threading.Lock() + self._sync_remote_time: Optional[datetime.datetime] = None + self._sync_monotonic_time: Optional[float] = None + + def get_time(self): + if (self._sync_monotonic_time is None) or (self._sync_remote_time is None): + self.sync() + assert self._sync_remote_time is not None + assert self._sync_monotonic_time is not None + return self._sync_remote_time + datetime.timedelta( + seconds=time.monotonic() - self._sync_monotonic_time + ) + + def sync(self): + with self._sync_lock: + if (self._sync_monotonic_time is not None) and ( + time.monotonic() - self._sync_monotonic_time + ) < MIN_RESYNC_SECONDS: + return + current_bq_time = list( + next( + self._bqclient.query_and_wait( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + ) + ) + )[0] + self._sync_remote_time = cast(datetime.datetime, current_bq_time) + self._sync_monotonic_time = time.monotonic() diff --git a/noxfile.py b/noxfile.py index f537005e57..24aec29c6c 100644 --- a/noxfile.py +++ b/noxfile.py @@ -51,6 +51,7 @@ UNIT_TEST_STANDARD_DEPENDENCIES = [ "mock", "asyncmock", + "freezegun", PYTEST_VERSION, "pytest-cov", "pytest-asyncio", diff --git a/tests/unit/resources.py b/tests/unit/resources.py index d45da82ab9..04db840b28 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -91,7 +91,16 @@ def query_mock(query, *args, **kwargs): return query_job + existing_query_and_wait = bqclient.query_and_wait + + def query_and_wait_mock(query, *args, **kwargs): + if query.startswith("SELECT CURRENT_TIMESTAMP()"): + return iter([[datetime.datetime.now()]]) + else: + return existing_query_and_wait(query, *args, **kwargs) + bqclient.query = query_mock + bqclient.query_and_wait = query_and_wait_mock clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) diff --git a/tests/unit/session/test_time.py b/tests/unit/session/test_time.py new file mode 100644 index 0000000000..87766e79bb --- /dev/null +++ b/tests/unit/session/test_time.py @@ -0,0 +1,69 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import unittest.mock as mock + +import freezegun +import google.cloud.bigquery +import pytest + +import bigframes.session.time + +INITIAL_BQ_TIME = datetime.datetime( + year=2020, + month=4, + day=24, + hour=8, + minute=55, + second=29, + tzinfo=datetime.timezone.utc, +) + + +@pytest.fixture() +def bq_client(): + bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) + + def query_and_wait_mock(query, *args, **kwargs): + if query.startswith("SELECT CURRENT_TIMESTAMP()"): + return iter([[INITIAL_BQ_TIME]]) + else: + return ValueError(f"mock cannot handle query : {query}") + + bqclient.query_and_wait = query_and_wait_mock + return bqclient + + +def test_bqsyncedclock_get_time(bq_client): + # this initial local time is actually irrelevant, only the ticks matter + initial_local_datetime = datetime.datetime( + year=1, month=7, day=12, hour=15, minute=6, second=3 + ) + + with freezegun.freeze_time(initial_local_datetime) as frozen_datetime: + clock = bigframes.session.time.BigQuerySyncedClock(bq_client) + + t1 = clock.get_time() + assert t1 == INITIAL_BQ_TIME + + frozen_datetime.tick(datetime.timedelta(seconds=3)) + t2 = clock.get_time() + assert t2 == INITIAL_BQ_TIME + datetime.timedelta(seconds=3) + + frozen_datetime.tick(datetime.timedelta(seconds=23529385)) + t3 = clock.get_time() + assert t3 == INITIAL_BQ_TIME + datetime.timedelta( + seconds=3 + ) + datetime.timedelta(seconds=23529385) From 8033dc579c28d3508a7d9f253aae7f58437ffe83 Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Thu, 31 Oct 2024 15:28:46 -0700 Subject: [PATCH 09/20] chore: update tpch q7/q14 (#1128) --- .../bigframes_vendored/tpch/queries/q14.py | 19 ++++++++++++---- .../bigframes_vendored/tpch/queries/q7.py | 22 ++++++------------- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/third_party/bigframes_vendored/tpch/queries/q14.py b/third_party/bigframes_vendored/tpch/queries/q14.py index 27f3d9e224..e2a5a73214 100644 --- a/third_party/bigframes_vendored/tpch/queries/q14.py +++ b/third_party/bigframes_vendored/tpch/queries/q14.py @@ -26,9 +26,20 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): filtered["L_EXTENDEDPRICE"] * (1 - filtered["L_DISCOUNT"]) ) * filtered["P_TYPE"].str.contains("PROMO").astype("Int64") - total_revenue = (filtered["L_EXTENDEDPRICE"] * (1 - filtered["L_DISCOUNT"])).sum() - promo_revenue = filtered["CONDI_REVENUE"].sum() + total_revenue = ( + (filtered["L_EXTENDEDPRICE"] * (1 - filtered["L_DISCOUNT"])) + .to_frame(name="TEMP") + .sum() + ) + + promo_revenue = filtered["CONDI_REVENUE"].to_frame(name="TEMP").sum() - promo_revenue_percent = 100.00 * promo_revenue / total_revenue + promo_revenue_percent = ( + (100.00 * promo_revenue / total_revenue) + .sort_index() + .reset_index(drop=True) + .round(2) + .to_frame(name="PROMO_REVENUE") + ) - _ = round(promo_revenue_percent, 2) + promo_revenue_percent.to_gbq() diff --git a/third_party/bigframes_vendored/tpch/queries/q7.py b/third_party/bigframes_vendored/tpch/queries/q7.py index a4dfe3f12e..93047dc299 100644 --- a/third_party/bigframes_vendored/tpch/queries/q7.py +++ b/third_party/bigframes_vendored/tpch/queries/q7.py @@ -35,28 +35,20 @@ def q(project_id: str, dataset_id: str, session: bigframes.Session): var3 = date(1995, 1, 1) var4 = date(1996, 12, 31) - n1 = nation[(nation["N_NAME"] == var1)] - n2 = nation[(nation["N_NAME"] == var2)] + nation = nation[nation["N_NAME"].isin([var1, var2])] + lineitem = lineitem[ + (lineitem["L_SHIPDATE"] >= var3) & (lineitem["L_SHIPDATE"] <= var4) + ] - jn1 = customer.merge(n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY") + jn1 = customer.merge(nation, left_on="C_NATIONKEY", right_on="N_NATIONKEY") jn2 = jn1.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") jn2 = jn2.rename(columns={"N_NAME": "CUST_NATION"}) jn3 = jn2.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") jn4 = jn3.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY") - jn5 = jn4.merge(n2, left_on="S_NATIONKEY", right_on="N_NATIONKEY") + jn5 = jn4.merge(nation, left_on="S_NATIONKEY", right_on="N_NATIONKEY") df1 = jn5.rename(columns={"N_NAME": "SUPP_NATION"}) + total = df1[df1["CUST_NATION"] != df1["SUPP_NATION"]] - jn1 = customer.merge(n2, left_on="C_NATIONKEY", right_on="N_NATIONKEY") - jn2 = jn1.merge(orders, left_on="C_CUSTKEY", right_on="O_CUSTKEY") - jn2 = jn2.rename(columns={"N_NAME": "CUST_NATION"}) - jn3 = jn2.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY") - jn4 = jn3.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY") - jn5 = jn4.merge(n1, left_on="S_NATIONKEY", right_on="N_NATIONKEY") - df2 = jn5.rename(columns={"N_NAME": "SUPP_NATION"}) - - total = bpd.concat([df1, df2]) - - total = total[(total["L_SHIPDATE"] >= var3) & (total["L_SHIPDATE"] <= var4)] total["VOLUME"] = total["L_EXTENDEDPRICE"] * (1.0 - total["L_DISCOUNT"]) total["L_YEAR"] = total["L_SHIPDATE"].dt.year From f7e435488d630cf4cf493c89ecdde94a95a7a0d7 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Fri, 1 Nov 2024 12:46:47 -0700 Subject: [PATCH 10/20] perf: Reduce dry runs from read_gbq with table (#1129) * perf: Reduce dry runs from read_gbq with table * add check for table creation time to not travel before that * handle table.created is None --- .../session/_io/bigquery/read_gbq_table.py | 83 ++++++++++--------- bigframes/session/loader.py | 20 +++-- tests/unit/session/test_session.py | 1 + 3 files changed, 58 insertions(+), 46 deletions(-) diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index f1d2b8f517..4044b7bf43 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -77,6 +77,11 @@ def get_table_metadata( return cached_table table = bqclient.get_table(table_ref) + # local time will lag a little bit do to network latency + # make sure it is at least table creation time. + # This is relevant if the table was created immediately before loading it here. + if (table.created is not None) and (table.created > bq_time): + bq_time = table.created cached_table = (bq_time, table) cache[table_ref] = cached_table @@ -85,56 +90,58 @@ def get_table_metadata( def validate_table( bqclient: bigquery.Client, - table_ref: bigquery.table.TableReference, + table: bigquery.table.Table, columns: Optional[Sequence[str]], snapshot_time: datetime.datetime, - table_type: str, filter_str: Optional[str] = None, ) -> bool: """Validates that the table can be read, returns True iff snapshot is supported.""" - # First run without snapshot to verify table can be read - sql = bigframes.session._io.bigquery.to_query( - query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", - columns=columns or (), - sql_predicate=filter_str, - ) - dry_run_config = bigquery.QueryJobConfig() - dry_run_config.dry_run = True - try: - bqclient.query_and_wait(sql, job_config=dry_run_config) - except google.api_core.exceptions.Forbidden as ex: - if "Drive credentials" in ex.message: - ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." - raise + time_travel_not_found = False # Anonymous dataset, does not support snapshot ever - if table_ref.dataset_id.startswith("_"): - return False - - # Materialized views,does not support snapshot - if table_type == "MATERIALIZED_VIEW": - warnings.warn( - "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " - "Attempting query without time travel. Be aware that as materialized views " - "are updated periodically, modifications to the underlying data in the view may " - "result in errors or unexpected behavior.", - category=bigframes.exceptions.TimeTravelDisabledWarning, + if table.dataset_id.startswith("_"): + pass + # Only true tables support time travel + elif table.table_type != "TABLE": + if table.table_type == "MATERIALIZED_VIEW": + warnings.warn( + "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that as materialized views " + "are updated periodically, modifications to the underlying data in the view may " + "result in errors or unexpected behavior.", + category=bigframes.exceptions.TimeTravelDisabledWarning, + ) + else: + # table might support time travel, lets do a dry-run query with time travel + snapshot_sql = bigframes.session._io.bigquery.to_query( + query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}", + columns=columns or (), + sql_predicate=filter_str, + time_travel_timestamp=snapshot_time, ) - return False + try: + # If this succeeds, we don't need to query without time travel, that would surely succeed + bqclient.query_and_wait( + snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) + ) + return True + except google.api_core.exceptions.NotFound: + # note that a notfound caused by a simple typo will be + # caught above when the metadata is fetched, not here + time_travel_not_found = True - # Second, try with snapshot to verify table supports this feature + # At this point, time travel is known to fail, but can we query without time travel? snapshot_sql = bigframes.session._io.bigquery.to_query( - query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}", + query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}", columns=columns or (), sql_predicate=filter_str, - time_travel_timestamp=snapshot_time, + time_travel_timestamp=None, ) - try: - bqclient.query_and_wait(snapshot_sql, job_config=dry_run_config) - return True - except google.api_core.exceptions.NotFound: - # note that a notfound caused by a simple typo will be - # caught above when the metadata is fetched, not here + # Any erorrs here should just be raised to user + bqclient.query_and_wait( + snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) + ) + if time_travel_not_found: warnings.warn( "NotFound error when reading table with time travel." " Attempting query without time travel. Warning: Without" @@ -142,7 +149,7 @@ def validate_table( " result in errors or unexpected behavior.", category=bigframes.exceptions.TimeTravelDisabledWarning, ) - return False + return False def are_index_cols_unique( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 2a6b59fa4b..90c7d2dc9e 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -343,14 +343,18 @@ def read_gbq_table( else (*columns, *[col for col in index_cols if col not in columns]) ) - enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table( - self._bqclient, - table_ref, - all_columns, - time_travel_timestamp, - table.table_type, - filter_str, - ) + try: + enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table( + self._bqclient, + table, + all_columns, + time_travel_timestamp, + filter_str, + ) + except google.api_core.exceptions.Forbidden as ex: + if "Drive credentials" in ex.message: + ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." + raise # ---------------------------- # Create ordering and validate diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index b76c74654c..1cfa134b15 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -181,6 +181,7 @@ def test_read_gbq_cached_table(): table._properties["location"] = session._location table._properties["numRows"] = "1000000000" table._properties["location"] = session._location + table._properties["type"] = "TABLE" session._loader._df_snapshot[table_ref] = ( datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), table, From e7fa9132fbd06dc4bc4b5f51692c49ae3a684f1f Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Tue, 5 Nov 2024 13:54:09 -0800 Subject: [PATCH 11/20] refactor: Add function to make all column ids in a tree unique and sequential (#1094) --- bigframes/core/__init__.py | 8 +- bigframes/core/blocks.py | 2 +- bigframes/core/compile/api.py | 28 +- bigframes/core/compile/compiled.py | 51 +-- bigframes/core/compile/compiler.py | 70 ++++- bigframes/core/compile/concat.py | 16 +- bigframes/core/compile/single_column.py | 7 +- bigframes/core/expression.py | 67 +++- bigframes/core/identifiers.py | 31 +- bigframes/core/nodes.py | 399 +++++++++++++++++++++--- bigframes/core/ordering.py | 5 + bigframes/core/rewrite.py | 38 ++- bigframes/core/window_spec.py | 20 +- 13 files changed, 614 insertions(+), 128 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 485a9d79a7..71b1214d01 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -268,7 +268,13 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]: def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: """Append together multiple ArrayValue objects.""" return ArrayValue( - nodes.ConcatNode(children=tuple([self.node, *[val.node for val in other]])) + nodes.ConcatNode( + children=tuple([self.node, *[val.node for val in other]]), + output_ids=tuple( + ids.ColumnId(bigframes.core.guid.generate_guid()) + for id in self.column_ids + ), + ) ) def compute_values(self, assignments: Sequence[ex.Expression]): diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index b0a8903e19..2648c9993f 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -3139,7 +3139,7 @@ def _pd_index_to_array_value( rows = [] labels_as_tuples = utils.index_as_tuples(index) for row_offset in range(len(index)): - id_gen = bigframes.core.identifiers.standard_identifiers() + id_gen = bigframes.core.identifiers.standard_id_strings() row_label = labels_as_tuples[row_offset] row_label = (row_label,) if not isinstance(row_label, tuple) else row_label row = {} diff --git a/bigframes/core/compile/api.py b/bigframes/core/compile/api.py index 86c8fca25a..61eaa63f85 100644 --- a/bigframes/core/compile/api.py +++ b/bigframes/core/compile/api.py @@ -18,14 +18,15 @@ import google.cloud.bigquery as bigquery import bigframes.core.compile.compiler as compiler -import bigframes.core.rewrite as rewrites if TYPE_CHECKING: import bigframes.core.nodes import bigframes.core.ordering import bigframes.core.schema -_STRICT_COMPILER = compiler.Compiler(strict=True) +_STRICT_COMPILER = compiler.Compiler( + strict=True, enable_pruning=True, enable_densify_ids=True +) class SQLCompiler: @@ -34,7 +35,7 @@ def __init__(self, strict: bool = True): def compile_peek(self, node: bigframes.core.nodes.BigFrameNode, n_rows: int) -> str: """Compile node into sql that selects N arbitrary rows, may not execute deterministically.""" - return self._compiler.compile_unordered_ir(node).peek_sql(n_rows) + return self._compiler.compile_peek_sql(node, n_rows) def compile_unordered( self, @@ -44,9 +45,8 @@ def compile_unordered( ) -> str: """Compile node into sql where rows are unsorted, and no ordering information is preserved.""" # TODO: Enable limit pullup, but only if not being used to write to clustered table. - return self._compiler.compile_unordered_ir(node).to_sql( - col_id_overrides=col_id_overrides - ) + output_ids = [col_id_overrides.get(id, id) for id in node.schema.names] + return self._compiler.compile_sql(node, ordered=False, output_ids=output_ids) def compile_ordered( self, @@ -56,10 +56,8 @@ def compile_ordered( ) -> str: """Compile node into sql where rows are sorted with ORDER BY.""" # If we are ordering the query anyways, compiling the slice as a limit is probably a good idea. - new_node, limit = rewrites.pullup_limit_from_slice(node) - return self._compiler.compile_ordered_ir(new_node).to_sql( - col_id_overrides=col_id_overrides, ordered=True, limit=limit - ) + output_ids = [col_id_overrides.get(id, id) for id in node.schema.names] + return self._compiler.compile_sql(node, ordered=True, output_ids=output_ids) def compile_raw( self, @@ -68,13 +66,12 @@ def compile_raw( str, Sequence[bigquery.SchemaField], bigframes.core.ordering.RowOrdering ]: """Compile node into sql that exposes all columns, including hidden ordering-only columns.""" - ir = self._compiler.compile_ordered_ir(node) - sql, schema = ir.raw_sql_and_schema() - return sql, schema, ir._ordering + return self._compiler.compile_raw(node) def test_only_try_evaluate(node: bigframes.core.nodes.BigFrameNode): """Use only for unit testing paths - not fully featured. Will throw exception if fails.""" + node = _STRICT_COMPILER._preprocess(node) ibis = _STRICT_COMPILER.compile_ordered_ir(node)._to_ibis_expr( ordering_mode="unordered" ) @@ -85,9 +82,10 @@ def test_only_ibis_inferred_schema(node: bigframes.core.nodes.BigFrameNode): """Use only for testing paths to ensure ibis inferred schema does not diverge from bigframes inferred schema.""" import bigframes.core.schema + node = _STRICT_COMPILER._preprocess(node) compiled = _STRICT_COMPILER.compile_unordered_ir(node) items = tuple( - bigframes.core.schema.SchemaItem(id, compiled.get_column_type(id)) - for id in compiled.column_ids + bigframes.core.schema.SchemaItem(name, compiled.get_column_type(ibis_id)) + for name, ibis_id in zip(node.schema.names, compiled.column_ids) ) return bigframes.core.schema.ArraySchema(items) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index d02a2c444c..d2783a07e2 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -202,7 +202,12 @@ def _aggregate_base( ) # Must have deterministic ordering, so order by the unique "by" column ordering = TotalOrdering( - tuple([OrderingExpression(column_id) for column_id in by_column_ids]), + tuple( + [ + OrderingExpression(ex.DerefOp(ref.id.local_normalized)) + for ref in by_column_ids + ] + ), total_ordering_columns=frozenset( [ex.DerefOp(ref.id.local_normalized) for ref in by_column_ids] ), @@ -266,31 +271,26 @@ def peek_sql(self, n: int): def to_sql( self, offset_column: typing.Optional[str] = None, - col_id_overrides: typing.Mapping[str, str] = {}, ordered: bool = False, ) -> str: if offset_column or ordered: raise ValueError("Cannot produce sorted sql in partial ordering mode") - sql = ibis_bigquery.Backend().compile( - self._to_ibis_expr( - col_id_overrides=col_id_overrides, - ) - ) + sql = ibis_bigquery.Backend().compile(self._to_ibis_expr()) return typing.cast(str, sql) - def row_count(self) -> OrderedIR: + def row_count(self, name: str) -> OrderedIR: original_table = self._to_ibis_expr() ibis_table = original_table.agg( [ - original_table.count().name("count"), + original_table.count().name(name), ] ) return OrderedIR( ibis_table, - (ibis_table["count"],), + (ibis_table[name],), ordering=TotalOrdering( - ordering_value_columns=(ascending_over("count"),), - total_ordering_columns=frozenset([ex.deref("count")]), + ordering_value_columns=(ascending_over(name),), + total_ordering_columns=frozenset([ex.deref(name)]), ), ) @@ -299,7 +299,6 @@ def _to_ibis_expr( *, expose_hidden_cols: bool = False, fraction: Optional[float] = None, - col_id_overrides: typing.Mapping[str, str] = {}, ): """ Creates an Ibis table expression representing the DataFrame. @@ -320,8 +319,6 @@ def _to_ibis_expr( If True, include the hidden ordering columns in the results. Only compatible with `order_by` and `unordered` ``ordering_mode``. - col_id_overrides: - overrides the column ids for the result Returns: An ibis expression representing the data help by the ArrayValue object. """ @@ -346,10 +343,6 @@ def _to_ibis_expr( if self._reduced_predicate is not None: table = table.filter(base_table[PREDICATE_COLUMN]) table = table.drop(*columns_to_drop) - if col_id_overrides: - table = table.rename( - {value: key for key, value in col_id_overrides.items()} - ) if fraction is not None: table = table.filter(ibis.random() < ibis.literal(fraction)) return table @@ -941,7 +934,6 @@ def _reproject_to_table(self) -> OrderedIR: def to_sql( self, - col_id_overrides: typing.Mapping[str, str] = {}, ordered: bool = False, limit: Optional[int] = None, ) -> str: @@ -951,17 +943,13 @@ def to_sql( sql = ibis_bigquery.Backend().compile( baked_ir._to_ibis_expr( ordering_mode="unordered", - col_id_overrides=col_id_overrides, expose_hidden_cols=True, ) ) - output_columns = [ - col_id_overrides.get(col, col) for col in baked_ir.column_ids - ] sql = ( bigframes.core.compile.googlesql.Select() .from_(sql) - .select(output_columns) + .select(self.column_ids) .sql() ) @@ -979,7 +967,6 @@ def to_sql( sql = ibis_bigquery.Backend().compile( self._to_ibis_expr( ordering_mode="unordered", - col_id_overrides=col_id_overrides, expose_hidden_cols=False, ) ) @@ -987,16 +974,19 @@ def to_sql( def raw_sql_and_schema( self, + column_ids: typing.Sequence[str], ) -> typing.Tuple[str, typing.Sequence[google.cloud.bigquery.SchemaField]]: """Return sql with all hidden columns. Used to cache with ordering information. Also returns schema, as the extra ordering columns are determined compile-time. """ + col_id_overrides = dict(zip(self.column_ids, column_ids)) all_columns = (*self.column_ids, *self._hidden_ordering_column_names.keys()) as_ibis = self._to_ibis_expr( ordering_mode="unordered", expose_hidden_cols=True, - ).select(all_columns) + ) + as_ibis = as_ibis.select(all_columns).rename(col_id_overrides) # Ibis will produce non-nullable schema types, but bigframes should always be nullable fixed_ibis_schema = ibis_schema.Schema.from_tuples( @@ -1013,7 +1003,6 @@ def _to_ibis_expr( *, expose_hidden_cols: bool = False, fraction: Optional[float] = None, - col_id_overrides: typing.Mapping[str, str] = {}, ordering_mode: Literal["string_encoded", "unordered"], order_col_name: Optional[str] = ORDER_ID_COLUMN, ): @@ -1043,8 +1032,6 @@ def _to_ibis_expr( order_col_name: If the ordering mode outputs a single ordering or offsets column, use this as the column name. - col_id_overrides: - overrides the column ids for the result Returns: An ibis expression representing the data help by the ArrayValue object. """ @@ -1086,10 +1073,6 @@ def _to_ibis_expr( if self._reduced_predicate is not None: table = table.filter(base_table[PREDICATE_COLUMN]) table = table.drop(*columns_to_drop) - if col_id_overrides: - table = table.rename( - {value: key for key, value in col_id_overrides.items()} - ) if fraction is not None: table = table.filter(ibis.random() < ibis.literal(fraction)) return table diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 1fa727780a..66fde9b874 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -18,6 +18,7 @@ import io import typing +import google.cloud.bigquery import ibis import ibis.backends import ibis.backends.bigquery @@ -32,6 +33,7 @@ import bigframes.core.compile.scalar_op_compiler as compile_scalar import bigframes.core.compile.schema_translator import bigframes.core.compile.single_column +import bigframes.core.expression as ex import bigframes.core.guid as guids import bigframes.core.identifiers as ids import bigframes.core.nodes as nodes @@ -50,31 +52,66 @@ class Compiler: strict: bool = True scalar_op_compiler = compile_scalar.ScalarOpCompiler() enable_pruning: bool = False + enable_densify_ids: bool = False + + def compile_sql( + self, node: nodes.BigFrameNode, ordered: bool, output_ids: typing.Sequence[str] + ) -> str: + node = self.set_output_names(node, output_ids) + if ordered: + node, limit = rewrites.pullup_limit_from_slice(node) + return self.compile_ordered_ir(self._preprocess(node)).to_sql( + ordered=True, limit=limit + ) + else: + return self.compile_unordered_ir(self._preprocess(node)).to_sql() + + def compile_peek_sql(self, node: nodes.BigFrameNode, n_rows: int) -> str: + return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows) + + def compile_raw( + self, + node: bigframes.core.nodes.BigFrameNode, + ) -> typing.Tuple[ + str, typing.Sequence[google.cloud.bigquery.SchemaField], bf_ordering.RowOrdering + ]: + ir = self.compile_ordered_ir(self._preprocess(node)) + sql, schema = ir.raw_sql_and_schema(column_ids=node.schema.names) + return sql, schema, ir._ordering def _preprocess(self, node: nodes.BigFrameNode): if self.enable_pruning: used_fields = frozenset(field.id for field in node.fields) node = node.prune(used_fields) node = functools.cache(rewrites.replace_slice_ops)(node) + if self.enable_densify_ids: + original_names = [id.name for id in node.ids] + node, _ = rewrites.remap_variables( + node, id_generator=ids.anonymous_serial_ids() + ) + node = self.set_output_names(node, original_names) return node - def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR: - ir = typing.cast( - compiled.OrderedIR, self.compile_node(self._preprocess(node), True) + def set_output_names( + self, node: bigframes.core.nodes.BigFrameNode, output_ids: typing.Sequence[str] + ): + # TODO: Create specialized output operators that will handle final names + return nodes.SelectionNode( + node, + tuple( + (ex.DerefOp(old_id), ids.ColumnId(out_id)) + for old_id, out_id in zip(node.ids, output_ids) + ), ) + + def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR: + ir = typing.cast(compiled.OrderedIR, self.compile_node(node, True)) if self.strict: assert ir.has_total_order return ir def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: - return typing.cast( - compiled.UnorderedIR, self.compile_node(self._preprocess(node), False) - ) - - def compile_peak_sql( - self, node: nodes.BigFrameNode, n_rows: int - ) -> typing.Optional[str]: - return self.compile_unordered_ir(self._preprocess(node)).peek_sql(n_rows) + return typing.cast(compiled.UnorderedIR, self.compile_node(node, False)) # TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution) @functools.lru_cache(maxsize=5000) @@ -144,11 +181,11 @@ def compile_fromrange(self, node: nodes.FromRangeNode, ordered: bool = True): labels_array_table = ibis.range( joined_table[start_column], joined_table[end_column] + node.step, node.step - ).name("labels") + ).name(node.output_id.sql) labels = ( typing.cast(ibis.expr.types.ArrayValue, labels_array_table) .as_table() - .unnest(["labels"]) + .unnest([node.output_id.sql]) ) if ordered: return compiled.OrderedIR( @@ -307,18 +344,19 @@ def compile_projection(self, node: nodes.ProjectionNode, ordered: bool = True): @_compile_node.register def compile_concat(self, node: nodes.ConcatNode, ordered: bool = True): + output_ids = [id.sql for id in node.output_ids] if ordered: compiled_ordered = [self.compile_ordered_ir(node) for node in node.children] - return concat_impl.concat_ordered(compiled_ordered) + return concat_impl.concat_ordered(compiled_ordered, output_ids) else: compiled_unordered = [ self.compile_unordered_ir(node) for node in node.children ] - return concat_impl.concat_unordered(compiled_unordered) + return concat_impl.concat_unordered(compiled_unordered, output_ids) @_compile_node.register def compile_rowcount(self, node: nodes.RowCountNode, ordered: bool = True): - result = self.compile_unordered_ir(node.child).row_count() + result = self.compile_unordered_ir(node.child).row_count(name=node.col_id.sql) return result if ordered else result.to_unordered() @_compile_node.register diff --git a/bigframes/core/compile/concat.py b/bigframes/core/compile/concat.py index 81d6805d22..ea4b59ca0b 100644 --- a/bigframes/core/compile/concat.py +++ b/bigframes/core/compile/concat.py @@ -32,6 +32,7 @@ def concat_unordered( items: typing.Sequence[compiled.UnorderedIR], + output_ids: typing.Sequence[str], ) -> compiled.UnorderedIR: """Append together multiple ArrayValue objects.""" if len(items) == 1: @@ -39,9 +40,8 @@ def concat_unordered( tables = [] for expr in items: table = expr._to_ibis_expr() - # Rename the value columns based on horizontal offset before applying union. table = table.select( - [table[col].name(f"column_{i}") for i, col in enumerate(table.columns)] + [table[col].name(id) for id, col in zip(output_ids, table.columns)] ) tables.append(table) combined_table = ibis.union(*tables) @@ -53,6 +53,7 @@ def concat_unordered( def concat_ordered( items: typing.Sequence[compiled.OrderedIR], + output_ids: typing.Sequence[str], ) -> compiled.OrderedIR: """Append together multiple ArrayValue objects.""" if len(items) == 1: @@ -67,19 +68,22 @@ def concat_ordered( ) for i, expr in enumerate(items): ordering_prefix = str(i).zfill(prefix_size) + renames = { + old_id: new_id for old_id, new_id in zip(expr.column_ids, output_ids) + } table = expr._to_ibis_expr( - ordering_mode="string_encoded", order_col_name=ORDER_ID_COLUMN + ordering_mode="string_encoded", + order_col_name=ORDER_ID_COLUMN, ) - # Rename the value columns based on horizontal offset before applying union. table = table.select( [ - table[col].name(f"column_{i}") + table[col].name(renames[col]) if col != ORDER_ID_COLUMN else ( ordering_prefix + reencode_order_string(table[ORDER_ID_COLUMN], max_encoding_size) ).name(ORDER_ID_COLUMN) - for i, col in enumerate(table.columns) + for col in table.columns ] ) tables.append(table) diff --git a/bigframes/core/compile/single_column.py b/bigframes/core/compile/single_column.py index 6f2f3f5b6e..2ec0796760 100644 --- a/bigframes/core/compile/single_column.py +++ b/bigframes/core/compile/single_column.py @@ -55,6 +55,7 @@ def join_by_column_ordered( l_value_mapping = dict(zip(left.column_ids, left.column_ids)) r_value_mapping = dict(zip(right.column_ids, right.column_ids)) + # hidden columns aren't necessarily unique, so need to remap to guids l_hidden_mapping = { id: guids.generate_guid("hidden_") for id in left._hidden_column_ids } @@ -68,12 +69,14 @@ def join_by_column_ordered( left_table = left._to_ibis_expr( ordering_mode="unordered", expose_hidden_cols=True, - col_id_overrides=l_mapping, ) + left_table = left_table.rename({val: key for key, val in l_hidden_mapping.items()}) right_table = right._to_ibis_expr( ordering_mode="unordered", expose_hidden_cols=True, - col_id_overrides=r_mapping, + ) + right_table = right_table.rename( + {val: key for key, val in r_hidden_mapping.items()} ) join_conditions = [ value_to_join_key(left_table[l_mapping[left_index]]) diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 9dee599a7c..3b7828bbf0 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -18,7 +18,7 @@ import dataclasses import itertools import typing -from typing import Mapping, Union +from typing import Mapping, TypeVar, Union import bigframes.core.identifiers as ids import bigframes.dtypes as dtypes @@ -56,6 +56,14 @@ def output_type( def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: return () + @abc.abstractmethod + def remap_column_refs( + self, + name_mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> Aggregation: + ... + @dataclasses.dataclass(frozen=True) class NullaryAggregation(Aggregation): @@ -66,6 +74,13 @@ def output_type( ) -> dtypes.ExpressionType: return self.op.output_type() + def remap_column_refs( + self, + name_mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> NullaryAggregation: + return self + @dataclasses.dataclass(frozen=True) class UnaryAggregation(Aggregation): @@ -81,6 +96,18 @@ def output_type( def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: return self.arg.column_references + def remap_column_refs( + self, + name_mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> UnaryAggregation: + return UnaryAggregation( + self.op, + self.arg.remap_column_refs( + name_mapping, allow_partial_bindings=allow_partial_bindings + ), + ) + @dataclasses.dataclass(frozen=True) class BinaryAggregation(Aggregation): @@ -99,6 +126,24 @@ def output_type( def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: return (*self.left.column_references, *self.right.column_references) + def remap_column_refs( + self, + name_mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> BinaryAggregation: + return BinaryAggregation( + self.op, + self.left.remap_column_refs( + name_mapping, allow_partial_bindings=allow_partial_bindings + ), + self.right.remap_column_refs( + name_mapping, allow_partial_bindings=allow_partial_bindings + ), + ) + + +TExpression = TypeVar("TExpression", bound="Expression") + @dataclasses.dataclass(frozen=True) class Expression(abc.ABC): @@ -109,14 +154,18 @@ def free_variables(self) -> typing.Tuple[str, ...]: return () @property + @abc.abstractmethod def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: - return () + ... def remap_column_refs( - self, name_mapping: Mapping[ids.ColumnId, ids.ColumnId] - ) -> Expression: + self: TExpression, + name_mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> TExpression: return self.bind_refs( - {old_id: DerefOp(new_id) for old_id, new_id in name_mapping.items()} + {old_id: DerefOp(new_id) for old_id, new_id in name_mapping.items()}, # type: ignore + allow_partial_bindings=allow_partial_bindings, ) @property @@ -174,6 +223,10 @@ class ScalarConstantExpression(Expression): def is_const(self) -> bool: return True + @property + def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: + return () + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: @@ -211,6 +264,10 @@ def free_variables(self) -> typing.Tuple[str, ...]: def is_const(self) -> bool: return False + @property + def column_references(self) -> typing.Tuple[ids.ColumnId, ...]: + return () + def output_type( self, input_types: dict[ids.ColumnId, bigframes.dtypes.Dtype] ) -> dtypes.ExpressionType: diff --git a/bigframes/core/identifiers.py b/bigframes/core/identifiers.py index 0d2aaeb07c..8c2f7e910f 100644 --- a/bigframes/core/identifiers.py +++ b/bigframes/core/identifiers.py @@ -15,13 +15,14 @@ import dataclasses import functools +import itertools from typing import Generator -def standard_identifiers() -> Generator[str, None, None]: +def standard_id_strings(prefix: str = "col_") -> Generator[str, None, None]: i = 0 while True: - yield f"col_{i}" + yield f"{prefix}{i}" i = i + 1 @@ -44,4 +45,28 @@ def local_normalized(self) -> ColumnId: return self # == ColumnId(name=self.sql) def __lt__(self, other: ColumnId) -> bool: - return self.name < other.name + return self.sql < other.sql + + +@dataclasses.dataclass(frozen=True) +class SerialColumnId(ColumnId): + """Id that is assigned a unique serial within the tree.""" + + name: str + id: int + + @property + def sql(self) -> str: + """Returns the unescaped SQL name.""" + return f"{self.name}_{self.id}" + + @property + def local_normalized(self) -> ColumnId: + """For use in compiler only. Normalizes to ColumnId referring to sql name.""" + return ColumnId(name=self.sql) + + +# TODO: Create serial ids locally, so can preserve name info +def anonymous_serial_ids() -> Generator[ColumnId, None, None]: + for i in itertools.count(): + yield SerialColumnId("uid", i) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 2e23f529e2..30a130bbac 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -20,7 +20,7 @@ import functools import itertools import typing -from typing import Callable, cast, Iterable, Optional, Sequence, Tuple +from typing import Callable, cast, Iterable, Mapping, Optional, Sequence, Tuple import google.cloud.bigquery as bq @@ -88,6 +88,19 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def row_count(self) -> typing.Optional[int]: return None + @abc.abstractmethod + def remap_refs( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + """Remap variable references""" + ... + + @property + @abc.abstractmethod + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + """The variables defined in this node (as opposed to by child nodes).""" + ... + @functools.cached_property def session(self): sessions = [] @@ -101,6 +114,17 @@ def session(self): return sessions[0] return None + def _validate(self): + """Validate the local data in the node.""" + return + + @functools.cache + def validate_tree(self) -> bool: + for child in self.child_nodes: + child.validate_tree() + self._validate() + return True + def _as_tuple(self) -> Tuple: """Get all fields as tuple.""" return tuple(getattr(self, field.name) for field in fields(self)) @@ -141,6 +165,7 @@ def fields(self) -> Iterable[Field]: @property def ids(self) -> Iterable[bfet_ids.ColumnId]: + """All output ids from the node.""" return (field.id for field in self.fields) @property @@ -220,6 +245,13 @@ def transform_children( """Apply a function to each child node.""" ... + @abc.abstractmethod + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + """Remap defined (in this node only) variables.""" + ... + @property def defines_namespace(self) -> bool: """ @@ -330,6 +362,18 @@ def row_count(self) -> typing.Optional[int]: (self.start, self.stop, self.step), child_length ) + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + @dataclass(frozen=True, eq=False) class JoinNode(BigFrameNode): @@ -338,7 +382,7 @@ class JoinNode(BigFrameNode): conditions: typing.Tuple[typing.Tuple[ex.DerefOp, ex.DerefOp], ...] type: typing.Literal["inner", "outer", "left", "right", "cross"] - def __post_init__(self): + def _validate(self): assert not ( set(self.left_child.ids) & set(self.right_child.ids) ), "Join ids collide" @@ -386,6 +430,10 @@ def row_count(self) -> Optional[int]: return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -397,24 +445,38 @@ def transform_children( return self return transformed - @property - def defines_namespace(self) -> bool: - return True - def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # If this is a cross join, make sure to select at least one column from each side - new_used = used_cols.union( + condition_cols = used_cols.union( map(lambda x: x.id, itertools.chain.from_iterable(self.conditions)) ) - return self.transform_children(lambda x: x.prune(new_used)) + return self.transform_children( + lambda x: x.prune(frozenset([*condition_cols, *used_cols])) + ) + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + new_conds = tuple( + ( + l_cond.remap_column_refs(mappings, allow_partial_bindings=True), + r_cond.remap_column_refs(mappings, allow_partial_bindings=True), + ) + for l_cond, r_cond in self.conditions + ) + return replace(self, conditions=new_conds) # type: ignore @dataclass(frozen=True, eq=False) class ConcatNode(BigFrameNode): # TODO: Explcitly map column ids from each child children: Tuple[BigFrameNode, ...] + output_ids: Tuple[bfet_ids.ColumnId, ...] - def __post_init__(self): + def _validate(self): if len(self.children) == 0: raise ValueError("Concat requires at least one input table. Zero provided.") child_schemas = [child.schema.dtypes for child in self.children] @@ -438,8 +500,8 @@ def explicitly_ordered(self) -> bool: def fields(self) -> Iterable[Field]: # TODO: Output names should probably be aligned beforehand or be part of concat definition return ( - Field(bfet_ids.ColumnId(f"column_{i}"), field.dtype) - for i, field in enumerate(self.children[0].fields) + Field(id, field.dtype) + for id, field in zip(self.output_ids, self.children[0].fields) ) @functools.cached_property @@ -457,6 +519,10 @@ def row_count(self) -> Optional[int]: total += count return total + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return self.output_ids + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -470,6 +536,15 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # TODO: Make concat prunable, probably by redefining return self + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_ids = tuple(mappings.get(id, id) for id in self.output_ids) + return replace(self, output_ids=new_ids) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + @dataclass(frozen=True, eq=False) class FromRangeNode(BigFrameNode): @@ -477,6 +552,7 @@ class FromRangeNode(BigFrameNode): start: BigFrameNode end: BigFrameNode step: int + output_id: bfet_ids.ColumnId = bfet_ids.ColumnId("labels") @property def roots(self) -> typing.Set[BigFrameNode]: @@ -496,9 +572,7 @@ def explicitly_ordered(self) -> bool: @functools.cached_property def fields(self) -> Iterable[Field]: - return ( - Field(bfet_ids.ColumnId("labels"), next(iter(self.start.fields)).dtype), - ) + return (Field(self.output_id, next(iter(self.start.fields)).dtype),) @functools.cached_property def variables_introduced(self) -> int: @@ -509,6 +583,14 @@ def variables_introduced(self) -> int: def row_count(self) -> Optional[int]: return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return (self.output_id,) + + @property + def defines_namespace(self) -> bool: + return True + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -522,6 +604,14 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # TODO: Make FromRangeNode prunable (or convert to other node types) return self + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return replace(self, output_id=mappings.get(self.output_id, self.output_id)) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + # Input Nodex # TODO: Most leaf nodes produce fixed column names based on the datasource @@ -595,9 +685,16 @@ def explicitly_ordered(self) -> bool: def row_count(self) -> typing.Optional[int]: return self.n_rows + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return tuple(item.id for item in self.scan_list.items) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: + # Don't preoduce empty scan list no matter what, will result in broken sql syntax + # TODO: Handle more elegantly new_scan_list = ScanList( tuple(item for item in self.scan_list.items if item.id in used_cols) + or (self.scan_list.items[0],) ) return ReadLocalNode( self.feather_bytes, @@ -607,6 +704,20 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: self.session, ) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_scan_list = ScanList( + tuple( + ScanItem(mappings.get(item.id, item.id), item.dtype, item.source_id) + for item in self.scan_list.items + ) + ) + return replace(self, scan_list=new_scan_list) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + @dataclass(frozen=True) class GbqTable: @@ -663,7 +774,7 @@ class ReadTableNode(LeafNode): table_session: bigframes.session.Session = field() - def __post_init__(self): + def _validate(self): # enforce invariants physical_names = set(map(lambda i: i.name, self.source.table.physical_schema)) if not set(scan.source_id for scan in self.scan_list.items).issubset( @@ -728,11 +839,30 @@ def row_count(self) -> typing.Optional[int]: return self.source.table.n_rows return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return tuple(item.id for item in self.scan_list.items) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: new_scan_list = ScanList( tuple(item for item in self.scan_list.items if item.id in used_cols) + or (self.scan_list.items[0],) ) - return ReadTableNode(self.source, new_scan_list, self.table_session) + return replace(self, scan_list=new_scan_list) + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_scan_list = ScanList( + tuple( + ScanItem(mappings.get(item.id, item.id), item.dtype, item.source_id) + for item in self.scan_list.items + ) + ) + return replace(self, scan_list=new_scan_list) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self @dataclass(frozen=True, eq=False) @@ -741,14 +871,6 @@ class CachedTableNode(ReadTableNode): # note: this isn't a "child" node. original_node: BigFrameNode = field() - def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: - new_scan_list = ScanList( - tuple(item for item in self.scan_list.items if item.id in used_cols) - ) - return CachedTableNode( - self.source, new_scan_list, self.table_session, self.original_node - ) - # Unary nodes @dataclass(frozen=True, eq=False) @@ -777,6 +899,10 @@ def variables_introduced(self) -> int: def row_count(self) -> Optional[int]: return self.child.row_count + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return (self.col_id,) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: if self.col_id not in used_cols: return self.child.prune(used_cols) @@ -784,6 +910,14 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: new_used = used_cols.difference([self.col_id]) return self.transform_children(lambda x: x.prune(new_used)) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return replace(self, col_id=mappings.get(self.col_id, self.col_id)) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + @dataclass(frozen=True, eq=False) class FilterNode(UnaryNode): @@ -801,11 +935,28 @@ def variables_introduced(self) -> int: def row_count(self) -> Optional[int]: return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: consumed_ids = used_cols.union(self.predicate.column_references) pruned_child = self.child.prune(consumed_ids) return FilterNode(pruned_child, self.predicate) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return replace( + self, + predicate=self.predicate.remap_column_refs( + mappings, allow_partial_bindings=True + ), + ) + @dataclass(frozen=True, eq=False) class OrderByNode(UnaryNode): @@ -828,6 +979,10 @@ def explicitly_ordered(self) -> bool: def row_count(self) -> Optional[int]: return self.child.row_count + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: ordering_cols = itertools.chain.from_iterable( map(lambda x: x.referenced_columns, self.by) @@ -836,6 +991,25 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: pruned_child = self.child.prune(consumed_ids) return OrderByNode(pruned_child, self.by) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + all_refs = set( + itertools.chain.from_iterable(map(lambda x: x.referenced_columns, self.by)) + ) + ref_mapping = {id: ex.DerefOp(mappings[id]) for id in all_refs} + new_by = cast( + tuple[OrderingExpression, ...], + tuple( + by_expr.bind_refs(ref_mapping, allow_partial_bindings=True) + for by_expr in self.by + ), + ) + return replace(self, by=new_by) + @dataclass(frozen=True, eq=False) class ReversedNode(UnaryNode): @@ -855,6 +1029,18 @@ def relation_ops_created(self) -> int: def row_count(self) -> Optional[int]: return self.child.row_count + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + @dataclass(frozen=True, eq=False) class SelectionNode(UnaryNode): @@ -862,6 +1048,11 @@ class SelectionNode(UnaryNode): typing.Tuple[ex.DerefOp, bigframes.core.identifiers.ColumnId], ... ] + def _validate(self): + for ref, _ in self.input_output_pairs: + if ref.id not in set(self.child.ids): + raise ValueError(f"Reference to column not in child: {ref.id}") + @functools.cached_property def fields(self) -> Iterable[Field]: return tuple( @@ -885,15 +1076,37 @@ def defines_namespace(self) -> bool: def row_count(self) -> Optional[int]: return self.child.row_count + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return tuple(id for _, id in self.input_output_pairs) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: - pruned_selections = tuple( - select for select in self.input_output_pairs if select[1] in used_cols + pruned_selections = ( + tuple( + select for select in self.input_output_pairs if select[1] in used_cols + ) + or self.input_output_pairs[:1] ) consumed_ids = frozenset(i[0].id for i in pruned_selections) pruned_child = self.child.prune(consumed_ids) return SelectionNode(pruned_child, pruned_selections) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_pairs = tuple( + (ref, mappings.get(id, id)) for ref, id in self.input_output_pairs + ) + return replace(self, input_output_pairs=new_pairs) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + new_fields = tuple( + (ex.remap_column_refs(mappings, allow_partial_bindings=True), id) + for ex, id in self.input_output_pairs + ) + return replace(self, input_output_pairs=new_fields) # type: ignore + @dataclass(frozen=True, eq=False) class ProjectionNode(UnaryNode): @@ -903,7 +1116,7 @@ class ProjectionNode(UnaryNode): typing.Tuple[ex.Expression, bigframes.core.identifiers.ColumnId], ... ] - def __post_init__(self): + def _validate(self): input_types = self.child._dtype_lookup for expression, id in self.assignments: # throws TypeError if invalid @@ -933,6 +1146,10 @@ def variables_introduced(self) -> int: def row_count(self) -> Optional[int]: return self.child.row_count + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return tuple(id for _, id in self.assignments) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: pruned_assignments = tuple(i for i in self.assignments if i[1] in used_cols) if len(pruned_assignments) == 0: @@ -943,11 +1160,26 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: pruned_child = self.child.prune(used_cols.union(consumed_ids)) return ProjectionNode(pruned_child, pruned_assignments) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_fields = tuple((ex, mappings.get(id, id)) for ex, id in self.assignments) + return replace(self, assignments=new_fields) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + new_fields = tuple( + (ex.remap_column_refs(mappings, allow_partial_bindings=True), id) + for ex, id in self.assignments + ) + return replace(self, assignments=new_fields) + # TODO: Merge RowCount into Aggregate Node? # Row count can be compute from table metadata sometimes, so it is a bit special. @dataclass(frozen=True, eq=False) class RowCountNode(UnaryNode): + col_id: bfet_ids.ColumnId = bfet_ids.ColumnId("count") + @property def row_preserving(self) -> bool: return False @@ -958,7 +1190,7 @@ def non_local(self) -> bool: @property def fields(self) -> Iterable[Field]: - return (Field(bfet_ids.ColumnId("count"), bigframes.dtypes.INT_DTYPE),) + return (Field(self.col_id, bigframes.dtypes.INT_DTYPE),) @property def variables_introduced(self) -> int: @@ -972,6 +1204,22 @@ def defines_namespace(self) -> bool: def row_count(self) -> Optional[int]: return 1 + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return (self.col_id,) + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return replace(self, col_id=mappings.get(self.col_id, self.col_id)) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return self + + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: + # TODO: Handle row count pruning + return self + @dataclass(frozen=True, eq=False) class AggregateNode(UnaryNode): @@ -1017,19 +1265,22 @@ def order_ambiguous(self) -> bool: def explicitly_ordered(self) -> bool: return True - @property - def defines_namespace(self) -> bool: - return True - @property def row_count(self) -> Optional[int]: if not self.by_column_ids: return 1 return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return tuple(id for _, id in self.aggregations) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: by_ids = (ref.id for ref in self.by_column_ids) - pruned_aggs = tuple(agg for agg in self.aggregations if agg[1] in used_cols) + pruned_aggs = ( + tuple(agg for agg in self.aggregations if agg[1] in used_cols) + or self.aggregations[:1] + ) agg_inputs = itertools.chain.from_iterable( agg.column_references for agg, _ in pruned_aggs ) @@ -1037,6 +1288,20 @@ def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: pruned_child = self.child.prune(consumed_ids) return AggregateNode(pruned_child, pruned_aggs, self.by_column_ids, self.dropna) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_aggs = tuple((agg, mappings.get(id, id)) for agg, id in self.aggregations) + return replace(self, aggregations=new_aggs) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + new_aggs = tuple( + (agg.remap_column_refs(mappings, allow_partial_bindings=True), id) + for agg, id in self.aggregations + ) + new_by_ids = tuple(id.remap_column_refs(mappings) for id in self.by_column_ids) + return replace(self, by_column_ids=new_by_ids, aggregations=new_aggs) + @dataclass(frozen=True, eq=False) class WindowOpNode(UnaryNode): @@ -1074,14 +1339,38 @@ def added_field(self) -> Field: new_item_dtype = self.op.output_type(input_type) return Field(self.output_name, new_item_dtype) + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return (self.output_name,) + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: if self.output_name not in used_cols: - return self.child - consumed_ids = used_cols.difference([self.output_name]).union( - [self.column_name.id] + return self.child.prune(used_cols) + consumed_ids = ( + used_cols.difference([self.output_name]) + .union([self.column_name.id]) + .union(self.window_spec.all_referenced_columns) ) return self.transform_children(lambda x: x.prune(consumed_ids)) + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return replace( + self, output_name=mappings.get(self.output_name, self.output_name) + ) + + def remap_refs(self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId]): + return replace( + self, + column_name=self.column_name.remap_column_refs( + mappings, allow_partial_bindings=True + ), + window_spec=self.window_spec.remap_column_refs( + mappings, allow_partial_bindings=True + ), + ) + @dataclass(frozen=True, eq=False) class RandomSampleNode(UnaryNode): @@ -1103,6 +1392,20 @@ def variables_introduced(self) -> int: def row_count(self) -> Optional[int]: return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + # TODO: Explode should create a new column instead of overriding the existing one @dataclass(frozen=True, eq=False) @@ -1135,16 +1438,26 @@ def relation_ops_created(self) -> int: def variables_introduced(self) -> int: return len(self.column_ids) + 1 - @property - def defines_namespace(self) -> bool: - return True - @property def row_count(self) -> Optional[int]: return None + @property + def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: + return () + def prune(self, used_cols: COLUMN_SET) -> BigFrameNode: # Cannot prune explode op - return self.transform_children( - lambda x: x.prune(used_cols.union(ref.id for ref in self.column_ids)) - ) + consumed_ids = used_cols.union(ref.id for ref in self.column_ids) + return self.transform_children(lambda x: x.prune(consumed_ids)) + + def remap_vars( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + return self + + def remap_refs( + self, mappings: Mapping[bfet_ids.ColumnId, bfet_ids.ColumnId] + ) -> BigFrameNode: + new_ids = tuple(id.remap_column_refs(mappings) for id in self.column_ids) + return replace(self, column_ids=new_ids) # type: ignore diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index 8bba7d72b6..acfb2adb3f 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -222,6 +222,11 @@ def _truncate_ordering( class TotalOrdering(RowOrdering): """Immutable object that holds information about the ordering of rows in a ArrayValue object. Guaranteed to be unambiguous.""" + def __post_init__(self): + assert set(ref.id for ref in self.total_ordering_columns).issubset( + self.referenced_columns + ) + # A table has a total ordering defined by the identities of a set of 1 or more columns. # These columns must always be part of the ordering, in order to guarantee that the ordering is total. # Therefore, any modifications(or drops) done to these columns must result in hidden copies being made. diff --git a/bigframes/core/rewrite.py b/bigframes/core/rewrite.py index 9c0eb81450..8187b16d87 100644 --- a/bigframes/core/rewrite.py +++ b/bigframes/core/rewrite.py @@ -16,7 +16,7 @@ import dataclasses import functools import itertools -from typing import cast, Mapping, Optional, Sequence, Tuple +from typing import cast, Generator, Mapping, Optional, Sequence, Tuple import bigframes.core.expression as scalar_exprs import bigframes.core.guid as guids @@ -578,3 +578,39 @@ def convert_complex_slice( ) conditions.append(step_cond) return merge_predicates(conditions) or scalar_exprs.const(True) + + +# TODO: May as well just outright remove selection nodes in this process. +def remap_variables( + root: nodes.BigFrameNode, id_generator: Generator[ids.ColumnId, None, None] +) -> Tuple[nodes.BigFrameNode, dict[ids.ColumnId, ids.ColumnId]]: + """ + Remap all variables in the BFET using the id_generator. + + Note: this will convert a DAG to a tree. + """ + child_replacement_map = dict() + ref_mapping = dict() + # Sequential ids are assigned bottom-up left-to-right + for child in root.child_nodes: + new_child, child_var_mapping = remap_variables(child, id_generator=id_generator) + child_replacement_map[child] = new_child + ref_mapping.update(child_var_mapping) + + # This is actually invalid until we've replaced all of children, refs and var defs + with_new_children = root.transform_children( + lambda node: child_replacement_map[node] + ) + + with_new_refs = with_new_children.remap_refs(ref_mapping) + + node_var_mapping = {old_id: next(id_generator) for old_id in root.node_defined_ids} + with_new_vars = with_new_refs.remap_vars(node_var_mapping) + with_new_vars._validate() + + return ( + with_new_vars, + node_var_mapping + if root.defines_namespace + else (ref_mapping | node_var_mapping), + ) diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 2b9ff65084..d8098f18f7 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -15,7 +15,7 @@ from dataclasses import dataclass import itertools -from typing import Optional, Set, Tuple, Union +from typing import Mapping, Optional, Set, Tuple, Union import bigframes.core.expression as ex import bigframes.core.identifiers as ids @@ -180,3 +180,21 @@ def all_referenced_columns(self) -> Set[ids.ColumnId]: item.scalar_expression.column_references for item in self.ordering ) return set(itertools.chain((i.id for i in self.grouping_keys), ordering_vars)) + + def remap_column_refs( + self, + mapping: Mapping[ids.ColumnId, ids.ColumnId], + allow_partial_bindings: bool = False, + ) -> WindowSpec: + return WindowSpec( + grouping_keys=tuple( + key.remap_column_refs(mapping, allow_partial_bindings) + for key in self.grouping_keys + ), + ordering=tuple( + order_part.remap_column_refs(mapping, allow_partial_bindings) + for order_part in self.ordering + ), + bounds=self.bounds, + min_periods=self.min_periods, + ) From 65e2e70b1aeca22fab09b8f7f3d0279e384294ec Mon Sep 17 00:00:00 2001 From: Huan Chen <142538604+Genesis929@users.noreply.github.com> Date: Wed, 6 Nov 2024 13:37:36 -0800 Subject: [PATCH 12/20] chore: add exec_seconds attribute to ExecutionMetrics (#1104) * chore: score query count assertion fix. * chore: add exec_seconds attribute to ExecutionMetrics * update naming * update logic. * update logic. * update logic. * update logic. --- bigframes/session/metrics.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 9be7119368..352cd0d892 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -29,48 +29,47 @@ class ExecutionMetrics: execution_count: int = 0 slot_millis: int = 0 bytes_processed: int = 0 + execution_secs: float = 0 def count_job_stats(self, query_job: bq_job.QueryJob): stats = get_performance_stats(query_job) if stats is not None: - bytes_processed, slot_millis, exec_seconds = stats + bytes_processed, slot_millis, execution_secs = stats self.execution_count += 1 self.bytes_processed += bytes_processed self.slot_millis += slot_millis + self.execution_secs += execution_secs if LOGGING_NAME_ENV_VAR in os.environ: # when running notebooks via pytest nbmake - write_stats_to_disk(bytes_processed, slot_millis, exec_seconds) + write_stats_to_disk(bytes_processed, slot_millis, execution_secs) def get_performance_stats( query_job: bigquery.QueryJob, -) -> Optional[Tuple[int, int, Optional[float]]]: +) -> Optional[Tuple[int, int, float]]: """Parse the query job for performance stats. Return None if the stats do not reflect real work done in bigquery. """ + + if ( + query_job.configuration.dry_run + or query_job.created is None + or query_job.ended is None + ): + return None + bytes_processed = query_job.total_bytes_processed if not isinstance(bytes_processed, int): return None # filter out mocks - if query_job.configuration.dry_run: - # dry run stats are just predictions of the real run - bytes_processed = 0 slot_millis = query_job.slot_millis if not isinstance(slot_millis, int): return None # filter out mocks - if query_job.configuration.dry_run: - # dry run stats are just predictions of the real run - slot_millis = 0 - - exec_seconds = ( - (query_job.ended - query_job.created).total_seconds() - if query_job.created is not None and query_job.ended is not None - else None - ) + execution_secs = (query_job.ended - query_job.created).total_seconds() - return bytes_processed, slot_millis, exec_seconds + return bytes_processed, slot_millis, execution_secs def write_stats_to_disk( From a38d4c422b6b312f6a54d7b1dd105a474ec2e91a Mon Sep 17 00:00:00 2001 From: Arwa Sharif <146148342+arwas11@users.noreply.github.com> Date: Wed, 6 Nov 2024 16:38:39 -0600 Subject: [PATCH 13/20] docs: update `DataFrame` docstrings to include the errors section (#1127) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * docs: update DataFrame docstrings to include the errors section. * Fix markup * Update third_party/bigframes_vendored/pandas/core/frame.py --------- Co-authored-by: Tim Sweña (Swast) --- .../bigframes_vendored/pandas/core/frame.py | 210 ++++++++++++++++-- .../bigframes_vendored/pandas/core/generic.py | 17 +- 2 files changed, 201 insertions(+), 26 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index a6c11ed1b9..4d71635b57 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -487,6 +487,19 @@ def to_gbq( str: The fully-qualified ID for the written table, in the form ``project.dataset.tablename``. + + Raises: + ValueError: + If an invalid value is provided for ``if_exists`` when ``destination_table`` + is ``None``. ``None`` or ``replace`` are the only valid values for ``if_exists``. + ValueError: + If an invalid value is provided for ``destination_table`` that is + not one of ``datasetID.tableId`` or ``projectId.datasetId.tableId``. + ValueError: + If an invalid value is provided for ``if_exists`` that is not one of + ``fail``, ``replace``, or ``append``. + + """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -531,7 +544,13 @@ def to_parquet( If ``False``, they will not be written to the file. Returns: - None or bytes: bytes if no path argument is provided else None + None or bytes: + bytes if no path argument is provided else None + + Raises: + ValueError: + If an invalid value provided for `compression` that is not one of + ``None``, ``snappy``, or ``gzip``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1140,9 +1159,11 @@ def insert(self, loc, column, value, allow_duplicates=False): Allow duplicate column labels to be created. Raises: + IndexError: + If ``column`` index is out of bounds with the total count of columns. ValueError: - If `column` is already contained in the DataFrame, - unless `allow_duplicates` is set to True. + If ``column`` is already contained in the DataFrame, + unless ``allow_duplicates`` is set to True. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1259,10 +1280,14 @@ def drop( level: For MultiIndex, level from which the labels will be removed. Returns: - bigframes.pandas.DataFrame: DataFrame without the removed column labels. + bigframes.pandas.DataFrame: + DataFrame without the removed column labels. Raises: KeyError: If any of the labels is not found in the selected axis. + ValueError: If values for both ``labels`` and ``index``/``columns`` are provided. + ValueError: If a multi-index tuple is provided as ``level``. + ValueError: If either ``labels`` or ``index``/``columns`` is not provided. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1419,7 +1444,12 @@ def set_index( Delete columns to be used as the new index. Returns: - bigframes.pandas.DataFrame: Changed row labels. + bigframes.pandas.DataFrame: + Changed row labels. + + Raises: + KeyError: + If key(s) are not in the columns. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1437,7 +1467,12 @@ def reorder_levels( Where to reorder levels. Returns: - bigframes.pandas.DataFrame: DataFrame of rearranged index. + bigframes.pandas.DataFrame: + DataFrame of rearranged index. + + Raises: + ValueError: + If columns are not multi-index. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1455,7 +1490,12 @@ def swaplevel(self, i, j, axis: str | int = 0) -> DataFrame: 'columns' for column-wise. Returns: - bigframes.pandas.DataFrame: DataFrame with levels swapped in MultiIndex. + bigframes.pandas.DataFrame: + DataFrame with levels swapped in MultiIndex. + + Raises: + ValueError: + If columns are not multi-index. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1474,7 +1514,12 @@ def droplevel(self, level, axis: str | int = 0): * 0 or 'index': remove level(s) in column. * 1 or 'columns': remove level(s) in row. Returns: - bigframes.pandas.DataFrame: DataFrame with requested index / column level(s) removed. + bigframes.pandas.DataFrame: + DataFrame with requested index / column level(s) removed. + + Raises: + ValueError: + If columns are not multi-index """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1724,7 +1769,12 @@ def dropna( Returns: - bigframes.pandas.DataFrame: DataFrame with NA entries dropped from it. + bigframes.pandas.DataFrame: + DataFrame with NA entries dropped from it. + + Raises: + ValueError: + If ``how`` is not one of ``any`` or ``all``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -1772,8 +1822,13 @@ def isin(self, values): the column names, which must match. Returns: - bigframes.pandas.DataFrame: DataFrame of booleans showing whether each element - in the DataFrame is contained in values. + bigframes.pandas.DataFrame: + DataFrame of booleans showing whether each element + in the DataFrame is contained in values. + + Raises: + TypeError: + If values provided are not list-like objects. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -2004,7 +2059,12 @@ def sort_values( if `first`; `last` puts NaNs at the end. Returns: - bigframes.pandas.DataFrame: DataFrame with sorted values. + bigframes.pandas.DataFrame: + DataFrame with sorted values. + + Raises: + ValueError: + If value of ``na_position`` is not one of ``first`` or ``last``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -2014,7 +2074,14 @@ def sort_index( """Sort object by labels (along an axis). Returns: - bigframes.pandas.DataFrame: The original DataFrame sorted by the labels. + bigframes.pandas.DataFrame: + The original DataFrame sorted by the labels. + + Raises: + ValueError: + If value of ``na_position`` is not one of ``first`` or ``last``. + ValueError: + If length of ``ascending`` dose not equal length of ``by``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -3727,7 +3794,12 @@ def combine( overwritten with NaNs. Returns: - bigframes.pandas.DataFrame: Combination of the provided DataFrames. + bigframes.pandas.DataFrame: + Combination of the provided DataFrames. + + Raises: + ValueError: + If ``func`` return value is not Series. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -3813,8 +3885,17 @@ def explode( If True, the resulting index will be labeled 0, 1, …, n - 1. Returns: - bigframes.pandas.DataFrame: Exploded lists to rows of the subset columns; + bigframes.pandas.DataFrame: + Exploded lists to rows of the subset columns; index will be duplicated for these rows. + + Raises: + ValueError: + * If columns of the frame are not unique. + * If specified columns to explode is empty list. + * If specified columns to explode have not matching count of elements rowwise in the frame. + KeyError: + If incorrect column names are provided """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -3929,6 +4010,10 @@ def update( Returns: None: This method directly changes calling object. + + Raises: + ValueError: + If a type of join other than ``left`` is provided as an argument. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -4023,7 +4108,14 @@ def groupby( values will also be treated as the key in groups. Returns: - bigframes.core.groupby.SeriesGroupBy: A groupby object that contains information about the groups. + bigframes.core.groupby.SeriesGroupBy: + A groupby object that contains information about the groups. + + Raises: + ValueError: + If both ``by`` and ``level`` are specified. + TypeError: + If one of ``by`` or `level`` is not specified. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -4109,7 +4201,14 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: values, without passing them to func. Returns: - bigframes.pandas.DataFrame: Transformed DataFrame. + bigframes.pandas.DataFrame: + Transformed DataFrame. + + Raises: + TypeError: + If value provided for ``func`` is not callable. + ValueError: + If value provided for ``na_action`` is not ``None`` or ``ignore``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -4209,7 +4308,18 @@ def join(self, other, *, on: Optional[str] = None, how: str) -> DataFrame: the order of the left keys. Returns: - bigframes.pandas.DataFrame: A dataframe containing columns from both the caller and `other`. + bigframes.pandas.DataFrame: + A dataframe containing columns from both the caller and `other`. + + Raises: + ValueError: + If value for ``on`` is specified for cross join. + ValueError: + If join on columns does not match the index level of the other + DataFrame. Join on columns with multi-index is not supported. + ValueError: + If left index to join on does not have the same number of levels + as the right index. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -4354,7 +4464,20 @@ def merge( no suffix. At least one of the values must not be None. Returns: - bigframes.pandas.DataFrame: A DataFrame of the two merged objects. + bigframes.pandas.DataFrame: + A DataFrame of the two merged objects. + + Raises: + ValueError: + If value for ``on`` is specified for cross join. + ValueError: + If ``on`` or ``left_on`` + ``right_on`` are not specified when ``on`` is ``None``. + ValueError: + If ``on`` and ``left_on`` + ``right_on`` are specified when ``on`` is not ``None``. + ValueError: + If no column with the provided label is found in ``self`` for left join. + ValueError: + If no column with the provided label is found in ``self`` for right join. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -4469,6 +4592,16 @@ def apply(self, func, *, axis=0, args=(), **kwargs): Returns: bigframes.pandas.DataFrame or bigframes.pandas.Series: Result of applying ``func`` along the given axis of the DataFrame. + + Raises: + ValueError: + If a remote function is not provided when ``axis=1`` is specified. + ValueError: + If number or input params in the remote function are not the same as + the number of columns in the dataframe. + ValueError: + If the dtypes of the columns in the dataframe are not compatible with + the data types of the remote function input params. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -5156,7 +5289,12 @@ def nlargest(self, n: int, columns, keep: str = "first"): selecting more than `n` items. Returns: - bigframes.pandas.DataFrame: The first `n` rows ordered by the given columns in descending order. + bigframes.pandas.DataFrame: + The first `n` rows ordered by the given columns in descending order. + + Raises: + ValueError: + If value of ``keep`` is not ``first``, ``last``, or ``all``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -5244,7 +5382,12 @@ def nsmallest(self, n: int, columns, keep: str = "first"): selecting more than `n` items. Returns: - bigframes.pandas.DataFrame: The first `n` rows ordered by the given columns in ascending order. + bigframes.pandas.DataFrame: + The first `n` rows ordered by the given columns in ascending order. + + Raises: + ValueError: + If value of ``keep`` is not ``first``, ``last``, or ``all``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -5513,7 +5656,12 @@ def cumsum(self) -> DataFrame: [3 rows x 2 columns] Returns: - bigframes.pandas.DataFrame: Return cumulative sum of DataFrame. + bigframes.pandas.DataFrame: + Return cumulative sum of DataFrame. + + Raises: + ValueError: + If values are not of numeric type. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -5545,7 +5693,12 @@ def cumprod(self) -> DataFrame: [3 rows x 2 columns] Returns: - bigframes.pandas.DataFrame: Return cumulative product of DataFrame. + bigframes.pandas.DataFrame: + Return cumulative product of DataFrame. + + Raises: + ValueError: + If values are not of numeric type. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -5695,7 +5848,12 @@ def describe(self): [8 rows x 2 columns] Returns: - bigframes.pandas.DataFrame: Summary statistics of the Series or Dataframe provided. + bigframes.pandas.DataFrame: + Summary statistics of the Series or Dataframe provided. + + Raises: + ValueError: + If unsupported ``include`` type is provided. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -6679,6 +6837,10 @@ def dot(self, other): If `other` is a Series, return the matrix product between self and other as a Series. If other is a DataFrame, return the matrix product of self and other in a DataFrame. + + Raises: + RuntimeError: + If unable to construct all columns. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 0ac527e2ff..101cdc5bd9 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -275,8 +275,13 @@ def to_json( list-like. Returns: - None or str: If path_or_buf is None, returns the resulting json format as a - string. Otherwise returns None. + None or str: + If path_or_buf is None, returns the resulting json format as a + string. Otherwise returns None. + + Raises: + ValueError: + If ``lines`` is True but ``records`` is not provided as value for ``orient``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -594,6 +599,10 @@ def sample( bigframes.pandas.DataFrame or bigframes.pandas.Series: A new object of same type as caller containing `n` items randomly sampled from the caller object. + + Raises: + ValueError: + If both ``n`` and ``frac`` are specified. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -908,6 +917,10 @@ def filter( Returns: bigframes.pandas.DataFrame or bigframes.pandas.Series: Same type as input object. + + Raises: + ValueError: + If value provided is not exactly one of ``items``, ``like``, or ``regex``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 0dea9413ff9f296883a3df9dd212947bb6a170aa Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 6 Nov 2024 17:26:10 -0800 Subject: [PATCH 14/20] test: add `requests` dependency to the anthropic remote function (#1137) * test: add `requests` dependecy to the anthropic remote function The latest anthropic release broke the notebook tests. More details in https://github.com/anthropics/anthropic-sdk-python/issues/738. * fix tupo --- .../remote_functions/remote_function_vertex_claude_model.ipynb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/notebooks/remote_functions/remote_function_vertex_claude_model.ipynb b/notebooks/remote_functions/remote_function_vertex_claude_model.ipynb index a5769a2285..b6c9029a01 100644 --- a/notebooks/remote_functions/remote_function_vertex_claude_model.ipynb +++ b/notebooks/remote_functions/remote_function_vertex_claude_model.ipynb @@ -264,7 +264,8 @@ } ], "source": [ - "@bpd.remote_function(packages=[\"anthropic[vertex]\"], max_batching_rows=1, \n", + "@bpd.remote_function(packages=[\"anthropic[vertex]\", \"google-auth[requests]\"],\n", + " max_batching_rows=1, \n", " bigquery_connection=\"bigframes-dev.us-east5.bigframes-rf-conn\") # replace with your connection\n", "def anthropic_transformer(message: str) -> str:\n", " from anthropic import AnthropicVertex\n", From 07bf2d4c08e8d10d272337d418ce56f32988b022 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Wed, 6 Nov 2024 21:36:31 -0800 Subject: [PATCH 15/20] test: Add data file for JSON dtype system tests (#1136) --- tests/data/json.jsonl | 16 ++++++++++++++++ tests/data/json_schema.json | 12 ++++++++++++ tests/system/conftest.py | 25 +++++++++++++++++++++++++ tests/system/small/test_series.py | 7 +++++++ 4 files changed, 60 insertions(+) create mode 100644 tests/data/json.jsonl create mode 100644 tests/data/json_schema.json diff --git a/tests/data/json.jsonl b/tests/data/json.jsonl new file mode 100644 index 0000000000..fbf0593612 --- /dev/null +++ b/tests/data/json.jsonl @@ -0,0 +1,16 @@ +{"rowindex": 0, "json_col": null} +{"rowindex": 1, "json_col": true} +{"rowindex": 2, "json_col": 100} +{"rowindex": 3, "json_col": 0.98} +{"rowindex": 4, "json_col": "a string"} +{"rowindex": 5, "json_col": []} +{"rowindex": 6, "json_col": [1, 2, 3]} +{"rowindex": 7, "json_col": [{"a": 1}, {"a": 2}, {"a": null}, {}]} +{"rowindex": 8, "json_col": {"bool_value": true}} +{"rowindex": 9, "json_col": {"folat_num": 3.14159}} +{"rowindex": 10, "json_col": {"date": "2024-07-16"}} +{"rowindex": 11, "json_col": {"null_filed": null}} +{"rowindex": 12, "json_col": {"int_value": 2, "null_filed": null}} +{"rowindex": 13, "json_col": {"list_data": [10, 20, 30]}} +{"rowindex": 14, "json_col": {"person": {"name": "Alice", "age": 35}}} +{"rowindex": 15, "json_col": {"order": {"items": ["book", "pen"], "total": 15.99}}} diff --git a/tests/data/json_schema.json b/tests/data/json_schema.json new file mode 100644 index 0000000000..6bbbf5ca55 --- /dev/null +++ b/tests/data/json_schema.json @@ -0,0 +1,12 @@ +[ + { + "name": "rowindex", + "type": "INTEGER", + "mode": "REQUIRED" + }, + { + "name": "json_col", + "type": "JSON", + "mode": "NULLABLE" + } +] diff --git a/tests/system/conftest.py b/tests/system/conftest.py index ba8f350c73..5e95b8ee5d 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -298,6 +298,7 @@ def load_test_data_tables( ("nested", "nested_schema.json", "nested.jsonl"), ("nested_structs", "nested_structs_schema.json", "nested_structs.jsonl"), ("repeated", "repeated_schema.json", "repeated.jsonl"), + ("json", "json_schema.json", "json.jsonl"), ("penguins", "penguins_schema.json", "penguins.jsonl"), ("time_series", "time_series_schema.json", "time_series.jsonl"), ("hockey_players", "hockey_players.json", "hockey_players.jsonl"), @@ -384,6 +385,11 @@ def repeated_table_id(test_data_tables) -> str: return test_data_tables["repeated"] +@pytest.fixture(scope="session") +def json_table_id(test_data_tables) -> str: + return test_data_tables["json"] + + @pytest.fixture(scope="session") def penguins_table_id(test_data_tables) -> str: return test_data_tables["penguins"] @@ -481,6 +487,25 @@ def repeated_pandas_df() -> pd.DataFrame: return df +@pytest.fixture(scope="session") +def json_df( + json_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """Returns a DataFrame containing columns of JSON type.""" + return session.read_gbq(json_table_id, index_col="rowindex") + + +@pytest.fixture(scope="session") +def json_pandas_df() -> pd.DataFrame: + """Returns a DataFrame containing columns of JSON type.""" + df = pd.read_json( + DATA_DIR / "json.jsonl", + lines=True, + ) + df = df.set_index("rowindex") + return df + + @pytest.fixture(scope="session") def scalars_df_default_index( scalars_df_index: bigframes.dataframe.DataFrame, diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 696501d1b9..b906f452b7 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -278,6 +278,13 @@ def test_get_column(scalars_dfs, col_name, expected_dtype): assert series_pandas.shape[0] == scalars_pandas_df.shape[0] +def test_get_column_w_json(json_df, json_pandas_df): + series = json_df["json_col"] + series_pandas = series.to_pandas() + assert series.dtype == pd.StringDtype(storage="pyarrow") + assert series_pandas.shape[0] == json_pandas_df.shape[0] + + def test_series_get_column_default(scalars_dfs): scalars_df, _ = scalars_dfs result = scalars_df.get(123123123123123, "default_val") From 4ef8bacdcc5447ba53c0f354526346f4dec7c5a1 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 7 Nov 2024 02:25:03 -0800 Subject: [PATCH 16/20] feat: support `json_extract_string_array` in the `bigquery` module (#1131) --- bigframes/bigquery/__init__.py | 2 + bigframes/bigquery/_operations/json.py | 119 ++++++++++++++++--- bigframes/core/compile/scalar_op_compiler.py | 14 +++ bigframes/operations/__init__.py | 37 ++++-- tests/system/small/bigquery/test_json.py | 40 ++++++- 5 files changed, 188 insertions(+), 24 deletions(-) diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index 0b2d2d5aeb..a39914d6e7 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -25,6 +25,7 @@ from bigframes.bigquery._operations.json import ( json_extract, json_extract_array, + json_extract_string_array, json_set, ) from bigframes.bigquery._operations.search import create_vector_index, vector_search @@ -37,6 +38,7 @@ "json_set", "json_extract", "json_extract_array", + "json_extract_string_array", "approx_top_count", "struct", "create_vector_index", diff --git a/bigframes/bigquery/_operations/json.py b/bigframes/bigquery/_operations/json.py index d3c3c97a9c..152c93186a 100644 --- a/bigframes/bigquery/_operations/json.py +++ b/bigframes/bigquery/_operations/json.py @@ -21,14 +21,17 @@ from __future__ import annotations -from typing import Any, Sequence, Tuple +from typing import Any, cast, Optional, Sequence, Tuple, Union +import bigframes.dtypes import bigframes.operations as ops import bigframes.series as series +from . import array + def json_set( - series: series.Series, + input: series.Series, json_path_value_pairs: Sequence[Tuple[str, Any]], ) -> series.Series: """Produces a new JSON value within a Series by inserting or replacing values at @@ -47,7 +50,7 @@ def json_set( Name: data, dtype: string Args: - series (bigframes.series.Series): + input (bigframes.series.Series): The Series containing JSON data (as native JSON objects or JSON-formatted strings). json_path_value_pairs (Sequence[Tuple[str, Any]]): Pairs of JSON path and the new value to insert/replace. @@ -59,6 +62,7 @@ def json_set( # SQLGlot parser does not support the "create_if_missing => true" syntax, so # create_if_missing is not currently implemented. + result = input for json_path_value_pair in json_path_value_pairs: if len(json_path_value_pair) != 2: raise ValueError( @@ -67,14 +71,14 @@ def json_set( ) json_path, json_value = json_path_value_pair - series = series._apply_binary_op( + result = result._apply_binary_op( json_value, ops.JSONSet(json_path=json_path), alignment="left" ) - return series + return result def json_extract( - series: series.Series, + input: series.Series, json_path: str, ) -> series.Series: """Extracts a JSON value and converts it to a SQL JSON-formatted `STRING` or `JSON` @@ -93,7 +97,7 @@ def json_extract( dtype: string Args: - series (bigframes.series.Series): + input (bigframes.series.Series): The Series containing JSON data (as native JSON objects or JSON-formatted strings). json_path (str): The JSON path identifying the data that you want to obtain from the input. @@ -101,16 +105,16 @@ def json_extract( Returns: bigframes.series.Series: A new Series with the JSON or JSON-formatted STRING. """ - return series._apply_unary_op(ops.JSONExtract(json_path=json_path)) + return input._apply_unary_op(ops.JSONExtract(json_path=json_path)) def json_extract_array( - series: series.Series, + input: series.Series, json_path: str = "$", ) -> series.Series: - """Extracts a JSON array and converts it to a SQL array of JSON-formatted `STRING` or `JSON` - values. This function uses single quotes and brackets to escape invalid JSONPath - characters in JSON keys. + """Extracts a JSON array and converts it to a SQL array of JSON-formatted + `STRING` or `JSON` values. This function uses single quotes and brackets to + escape invalid JSONPath characters in JSON keys. **Examples:** @@ -124,13 +128,98 @@ def json_extract_array( 1 ['4' '5'] dtype: list[pyarrow] + >>> s = bpd.Series([ + ... '{"fruits": [{"name": "apple"}, {"name": "cherry"}]}', + ... '{"fruits": [{"name": "guava"}, {"name": "grapes"}]}' + ... ]) + >>> bbq.json_extract_array(s, "$.fruits") + 0 ['{"name":"apple"}' '{"name":"cherry"}'] + 1 ['{"name":"guava"}' '{"name":"grapes"}'] + dtype: list[pyarrow] + + >>> s = bpd.Series([ + ... '{"fruits": {"color": "red", "names": ["apple","cherry"]}}', + ... '{"fruits": {"color": "green", "names": ["guava", "grapes"]}}' + ... ]) + >>> bbq.json_extract_array(s, "$.fruits.names") + 0 ['"apple"' '"cherry"'] + 1 ['"guava"' '"grapes"'] + dtype: list[pyarrow] + Args: - series (bigframes.series.Series): + input (bigframes.series.Series): The Series containing JSON data (as native JSON objects or JSON-formatted strings). json_path (str): The JSON path identifying the data that you want to obtain from the input. Returns: - bigframes.series.Series: A new Series with the JSON or JSON-formatted STRING. + bigframes.series.Series: A new Series with the parsed arrays from the input. """ - return series._apply_unary_op(ops.JSONExtractArray(json_path=json_path)) + return input._apply_unary_op(ops.JSONExtractArray(json_path=json_path)) + + +def json_extract_string_array( + input: series.Series, + json_path: str = "$", + value_dtype: Optional[ + Union[bigframes.dtypes.Dtype, bigframes.dtypes.DtypeString] + ] = None, +) -> series.Series: + """Extracts a JSON array and converts it to a SQL array of `STRING` values. + A `value_dtype` can be provided to further coerce the data type of the + values in the array. This function uses single quotes and brackets to escape + invalid JSONPath characters in JSON keys. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import bigframes.bigquery as bbq + >>> bpd.options.display.progress_bar = None + + >>> s = bpd.Series(['[1, 2, 3]', '[4, 5]']) + >>> bbq.json_extract_string_array(s) + 0 ['1' '2' '3'] + 1 ['4' '5'] + dtype: list[pyarrow] + + >>> bbq.json_extract_string_array(s, value_dtype='Int64') + 0 [1 2 3] + 1 [4 5] + dtype: list[pyarrow] + + >>> s = bpd.Series([ + ... '{"fruits": {"color": "red", "names": ["apple","cherry"]}}', + ... '{"fruits": {"color": "green", "names": ["guava", "grapes"]}}' + ... ]) + >>> bbq.json_extract_string_array(s, "$.fruits.names") + 0 ['apple' 'cherry'] + 1 ['guava' 'grapes'] + dtype: list[pyarrow] + + Args: + input (bigframes.series.Series): + The Series containing JSON data (as native JSON objects or JSON-formatted strings). + json_path (str): + The JSON path identifying the data that you want to obtain from the input. + value_dtype (dtype, Optional): + The data type supported by BigFrames DataFrame. + + Returns: + bigframes.series.Series: A new Series with the parsed arrays from the input. + """ + array_series = input._apply_unary_op( + ops.JSONExtractStringArray(json_path=json_path) + ) + if value_dtype not in [None, bigframes.dtypes.STRING_DTYPE]: + array_items_series = array_series.explode() + if value_dtype == bigframes.dtypes.BOOL_DTYPE: + array_items_series = array_items_series.str.lower() == "true" + else: + array_items_series = array_items_series.astype(value_dtype) + array_series = cast( + series.Series, + array.array_agg( + array_items_series.groupby(level=input.index.names, dropna=False) + ), + ) + return array_series diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 729b341e85..80e354aa8c 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1140,6 +1140,13 @@ def json_extract_array_op_impl(x: ibis_types.Value, op: ops.JSONExtractArray): return json_extract_array(json_obj=x, json_path=op.json_path) +@scalar_op_compiler.register_unary_op(ops.JSONExtractStringArray, pass_op=True) +def json_extract_string_array_op_impl( + x: ibis_types.Value, op: ops.JSONExtractStringArray +): + return json_extract_string_array(json_obj=x, json_path=op.json_path) + + ### Binary Ops def short_circuit_nulls(type_override: typing.Optional[ibis_dtypes.DataType] = None): """Wraps a binary operator to generate nulls of the expected type if either input is a null scalar.""" @@ -1801,6 +1808,13 @@ def json_extract_array( """Extracts a JSON array and converts it to a SQL ARRAY of JSON-formatted STRINGs or JSON values.""" +@ibis.udf.scalar.builtin(name="json_extract_string_array") +def json_extract_string_array( + json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.str +) -> ibis_dtypes.Array[ibis_dtypes.String]: + """Extracts a JSON array and converts it to a SQL ARRAY of STRINGs.""" + + @ibis.udf.scalar.builtin(name="ML.DISTANCE") def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: """Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")""" diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 63127a70de..2e2e4a0552 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -25,7 +25,6 @@ from pandas.tseries.offsets import DateOffset import pyarrow as pa -import bigframes.dtypes import bigframes.dtypes as dtypes import bigframes.operations.type as op_typing @@ -526,6 +525,13 @@ class RemoteFunctionOp(UnaryOp): def output_type(self, *input_types): # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method if hasattr(self.func, "output_dtype"): + if dtypes.is_array_like(self.func.output_dtype): + # TODO(b/284515241): remove this special handling to support + # array output types once BQ remote functions support ARRAY. + # Until then, use json serialized strings at the remote function + # level, and parse that to the intended output type at the + # bigframes level. + return dtypes.STRING_DTYPE return self.func.output_dtype else: raise AttributeError("output_dtype not defined") @@ -548,9 +554,9 @@ class ToDatetimeOp(UnaryOp): def output_type(self, *input_types): if input_types[0] not in ( - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.STRING_DTYPE, + dtypes.FLOAT_DTYPE, + dtypes.INT_DTYPE, + dtypes.STRING_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz=None)) @@ -565,9 +571,9 @@ class ToTimestampOp(UnaryOp): def output_type(self, *input_types): # Must be numeric or string if input_types[0] not in ( - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.STRING_DTYPE, + dtypes.FLOAT_DTYPE, + dtypes.INT_DTYPE, + dtypes.STRING_DTYPE, ): raise TypeError("expected string or numeric input") return pd.ArrowDtype(pa.timestamp("us", tz="UTC")) @@ -699,6 +705,23 @@ def output_type(self, *input_types): ) +@dataclasses.dataclass(frozen=True) +class JSONExtractStringArray(UnaryOp): + name: typing.ClassVar[str] = "json_extract_string_array" + json_path: str + + def output_type(self, *input_types): + input_type = input_types[0] + if not dtypes.is_json_like(input_type): + raise TypeError( + "Input type must be an valid JSON object or JSON-formatted string type." + + f" Received type: {input_type}" + ) + return pd.ArrowDtype( + pa.list_(dtypes.bigframes_dtype_to_arrow_dtype(dtypes.STRING_DTYPE)) + ) + + # Binary Ops fillna_op = create_binary_op(name="fillna", type_signature=op_typing.COERCE) maximum_op = create_binary_op(name="maximum", type_signature=op_typing.COERCE) diff --git a/tests/system/small/bigquery/test_json.py b/tests/system/small/bigquery/test_json.py index 68356f4a15..75b9345107 100644 --- a/tests/system/small/bigquery/test_json.py +++ b/tests/system/small/bigquery/test_json.py @@ -19,6 +19,7 @@ import pytest import bigframes.bigquery as bbq +import bigframes.dtypes import bigframes.pandas as bpd @@ -142,9 +143,9 @@ def test_json_extract_w_invalid_series_type(): def test_json_extract_array_from_json_strings(): - s = bpd.Series(['{"a": [1, 2, 3]}', '{"a": []}', '{"a": [4,5]}']) + s = bpd.Series(['{"a": ["ab", "2", "3 xy"]}', '{"a": []}', '{"a": ["4","5"]}']) actual = bbq.json_extract_array(s, "$.a") - expected = bpd.Series([["1", "2", "3"], [], ["4", "5"]]) + expected = bpd.Series([['"ab"', '"2"', '"3 xy"'], [], ['"4"', '"5"']]) pd.testing.assert_series_equal( actual.to_pandas(), expected.to_pandas(), @@ -164,3 +165,38 @@ def test_json_extract_array_from_array_strings(): def test_json_extract_array_w_invalid_series_type(): with pytest.raises(TypeError): bbq.json_extract_array(bpd.Series([1, 2])) + + +def test_json_extract_string_array_from_json_strings(): + s = bpd.Series(['{"a": ["ab", "2", "3 xy"]}', '{"a": []}', '{"a": ["4","5"]}']) + actual = bbq.json_extract_string_array(s, "$.a") + expected = bpd.Series([["ab", "2", "3 xy"], [], ["4", "5"]]) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +def test_json_extract_string_array_from_array_strings(): + s = bpd.Series(["[1, 2, 3]", "[]", "[4,5]"]) + actual = bbq.json_extract_string_array(s) + expected = bpd.Series([["1", "2", "3"], [], ["4", "5"]]) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +def test_json_extract_string_array_as_float_array_from_array_strings(): + s = bpd.Series(["[1, 2.5, 3]", "[]", "[4,5]"]) + actual = bbq.json_extract_string_array(s, value_dtype=bigframes.dtypes.FLOAT_DTYPE) + expected = bpd.Series([[1, 2.5, 3], [], [4, 5]]) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + ) + + +def test_json_extract_string_array_w_invalid_series_type(): + with pytest.raises(TypeError): + bbq.json_extract_string_array(bpd.Series([1, 2])) From 4873f9dd48a6e7a967300f9382dc359ff3b7e0d0 Mon Sep 17 00:00:00 2001 From: Arwa Sharif <146148342+arwas11@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:11:52 -0600 Subject: [PATCH 17/20] chore: fix docstrings typos (#1134) * chore: fix docstring typos * fix docstrings typos --- notebooks/remote_functions/remote_function.ipynb | 2 +- third_party/bigframes_vendored/pandas/core/frame.py | 6 +++--- third_party/bigframes_vendored/pandas/core/series.py | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/notebooks/remote_functions/remote_function.ipynb b/notebooks/remote_functions/remote_function.ipynb index 063c1738b4..1c1048d356 100644 --- a/notebooks/remote_functions/remote_function.ipynb +++ b/notebooks/remote_functions/remote_function.ipynb @@ -903,7 +903,7 @@ } ], "source": [ - "# Let's try to simulate a scenario in which user shares this remote funciton to\n", + "# Let's try to simulate a scenario in which user shares this remote function to\n", "# their colleague who simply wants to reuse it. BigFrames provides an API to do\n", "# so via `read_gbq_function`. Usage details are available via `help` command.\n", "help(pd.read_gbq_function)" diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 4d71635b57..70da1a5c4c 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -2649,7 +2649,7 @@ def add(self, other, axis: str | int = "columns") -> DataFrame: raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) def __add__(self, other) -> DataFrame: - """Get addition of DataFrame and other, column-wise, using arithmatic + """Get addition of DataFrame and other, column-wise, using arithmetic operator `+`. Equivalent to ``DataFrame.add(other)``. @@ -3308,7 +3308,7 @@ def floordiv(self, other, axis: str | int = "columns") -> DataFrame: def __floordiv__(self, other): """ - Get integer divison of DataFrame by other, using arithmatic operator `//`. + Get integer division of DataFrame by other, using arithmetic operator `//`. Equivalent to `DataFrame.floordiv(other)`. @@ -4358,7 +4358,7 @@ def merge( >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - Merge DataFrames df1 and df2 by specifiying type of merge: + Merge DataFrames df1 and df2 by specifying type of merge: >>> df1 = bpd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]}) >>> df1 diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index a3b85205a9..7c8f452a8f 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -1191,7 +1191,7 @@ def apply( >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None - For applying arbitrary python function a `remote_funciton` is recommended. + For applying arbitrary python function a `remote_function` is recommended. Let's use ``reuse=False`` flag to make sure a new `remote_function` is created every time we run the following code, but you can skip it to potentially reuse a previously deployed `remote_function` from @@ -2663,7 +2663,7 @@ def floordiv(self, other) -> Series: def __floordiv__(self, other): """ - Get integer divison of Series by other, using arithmatic operator `//`. + Get integer division of Series by other, using arithmetic operator `//`. Equivalent to `Series.floordiv(other)`. @@ -2716,7 +2716,7 @@ def rfloordiv(self, other) -> Series: def __rfloordiv__(self, other): """ - Get integer divison of other by Series, using arithmatic operator `//`. + Get integer division of other by Series, using arithmetic operator `//`. Equivalent to `Series.rfloordiv(other)`. @@ -2725,7 +2725,7 @@ def __rfloordiv__(self, other): Object to divide by the Series. Returns: - Series: The result of the integer divison. + Series: The result of the integer division. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 7ac6639fb0e8baf5fb3adf5785dffd8cf9b06702 Mon Sep 17 00:00:00 2001 From: rey-esp Date: Mon, 11 Nov 2024 10:41:59 -0600 Subject: [PATCH 18/20] docs: add file for Classification with a Boosted Treed Model and snippet for preparing sample data (#1135) * docs: add snippet for Linear Regression tutorial Explain Prediction section * add snippet that prepares census sample data * remove file changes * rename file due to typo --- .../classification_boosted_tree_model_test.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 samples/snippets/classification_boosted_tree_model_test.py diff --git a/samples/snippets/classification_boosted_tree_model_test.py b/samples/snippets/classification_boosted_tree_model_test.py new file mode 100644 index 0000000000..fbc9369dde --- /dev/null +++ b/samples/snippets/classification_boosted_tree_model_test.py @@ -0,0 +1,42 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def test_boosted_tree_model(random_model_id: str) -> None: + # your_model_id = random_model_id + # [START bigquery_dataframes_bqml_boosted_tree_prepare] + import bigframes.pandas as bpd + + input_data = bpd.read_gbq( + "bigquery-public-data.ml_datasets.census_adult_income", + columns=( + "age", + "workclass", + "marital_status", + "education_num", + "occupation", + "hours_per_week", + "income_bracket", + "functional_weight", + ), + ) + input_data["dataframe"] = bpd.Series("training", index=input_data.index,).case_when( + [ + (((input_data["functional_weight"] % 10) == 8), "evaluation"), + (((input_data["functional_weight"] % 10) == 9), "prediction"), + ] + ) + del input_data["functional_weight"] + # [END bigquery_dataframes_bqml_boosted_tree_prepare] + assert input_data is not None From a87042158b181dceee31124fe208926a3bb1071f Mon Sep 17 00:00:00 2001 From: Arwa Sharif <146148342+arwas11@users.noreply.github.com> Date: Tue, 12 Nov 2024 09:28:21 -0600 Subject: [PATCH 19/20] docs: update Session doctrings to include exceptions (#1130) --- bigframes/session/__init__.py | 4 ++-- third_party/bigframes_vendored/pandas/io/gbq.py | 5 +++++ third_party/bigframes_vendored/pandas/io/parsers/readers.py | 2 ++ 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 004f5d322b..1c8d497974 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -500,7 +500,7 @@ def read_gbq_query( Raises: ValueError: - When both columns (preferred) and col_order are specified. + When both ``columns`` and ``col_order`` are specified. """ # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so # these docstrings are inline. @@ -552,7 +552,7 @@ def read_gbq_table( Raises: ValueError: - When both columns (preferred) and col_order are specified. + When both ``columns`` and ``col_order`` are specified. """ # NOTE: This method doesn't (yet) exist in pandas or pandas-gbq, so # these docstrings are inline. diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 4bd4353413..b4dd10ef10 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -160,6 +160,11 @@ def read_gbq( bigframes.exceptions.DefaultIndexWarning: Using the default index is discouraged, such as with clustered or partitioned tables without primary keys. + ValueError: + When both ``columns`` and ``col_order`` are specified. + ValueError: + If ``configuration`` is specified when directly reading + from a table. Returns: bigframes.pandas.DataFrame: A DataFrame representing results of the query or table. diff --git a/third_party/bigframes_vendored/pandas/io/parsers/readers.py b/third_party/bigframes_vendored/pandas/io/parsers/readers.py index 90154d8a00..910d3ad083 100644 --- a/third_party/bigframes_vendored/pandas/io/parsers/readers.py +++ b/third_party/bigframes_vendored/pandas/io/parsers/readers.py @@ -237,5 +237,7 @@ def read_json( bigframes.exceptions.DefaultIndexWarning: Using the default index is discouraged, such as with clustered or partitioned tables without primary keys. + ValueError: + ``lines`` is only valid when ``orient`` is ``records``. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From 14d9ac8579c79dc93a8398fca14e92b622f2cdd6 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 10:51:10 -0600 Subject: [PATCH 20/20] chore(main): release 1.26.0 (#1123) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 29 +++++++++++++++++++++++ bigframes/version.py | 2 +- third_party/bigframes_vendored/version.py | 2 +- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f18459a24..1df47b2afc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,35 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.26.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.25.0...v1.26.0) (2024-11-12) + + +### Features + +* Add basic geopandas functionality ([#962](https://github.com/googleapis/python-bigquery-dataframes/issues/962)) ([3759c63](https://github.com/googleapis/python-bigquery-dataframes/commit/3759c6397eaa3c46c4142aa51ca22be3dc8e4971)) +* Support `json_extract_string_array` in the `bigquery` module ([#1131](https://github.com/googleapis/python-bigquery-dataframes/issues/1131)) ([4ef8bac](https://github.com/googleapis/python-bigquery-dataframes/commit/4ef8bacdcc5447ba53c0f354526346f4dec7c5a1)) + + +### Bug Fixes + +* Fix Series.to_frame generating string label instead of int where name is None ([#1118](https://github.com/googleapis/python-bigquery-dataframes/issues/1118)) ([14e32b5](https://github.com/googleapis/python-bigquery-dataframes/commit/14e32b51c11c1718128f49ef94e754afc0ac0618)) +* Update the API documentation with newly added rep ([#1120](https://github.com/googleapis/python-bigquery-dataframes/issues/1120)) ([72c228b](https://github.com/googleapis/python-bigquery-dataframes/commit/72c228b15627e6047d60ae42740563a6dfea73da)) + + +### Performance Improvements + +* Reduce CURRENT_TIMESTAMP queries ([#1114](https://github.com/googleapis/python-bigquery-dataframes/issues/1114)) ([32274b1](https://github.com/googleapis/python-bigquery-dataframes/commit/32274b130849b37d7e587643cf7b6d109455ff38)) +* Reduce dry runs from read_gbq with table ([#1129](https://github.com/googleapis/python-bigquery-dataframes/issues/1129)) ([f7e4354](https://github.com/googleapis/python-bigquery-dataframes/commit/f7e435488d630cf4cf493c89ecdde94a95a7a0d7)) + + +### Documentation + +* Add file for Classification with a Boosted Treed Model and snippet for preparing sample data ([#1135](https://github.com/googleapis/python-bigquery-dataframes/issues/1135)) ([7ac6639](https://github.com/googleapis/python-bigquery-dataframes/commit/7ac6639fb0e8baf5fb3adf5785dffd8cf9b06702)) +* Add snippet for Linear Regression tutorial Predict Outcomes section ([#1101](https://github.com/googleapis/python-bigquery-dataframes/issues/1101)) ([108f4a9](https://github.com/googleapis/python-bigquery-dataframes/commit/108f4a98463596d8df6d381b3580eb72eab41b6e)) +* Update `DataFrame` docstrings to include the errors section ([#1127](https://github.com/googleapis/python-bigquery-dataframes/issues/1127)) ([a38d4c4](https://github.com/googleapis/python-bigquery-dataframes/commit/a38d4c422b6b312f6a54d7b1dd105a474ec2e91a)) +* Update GroupBy docstrings ([#1103](https://github.com/googleapis/python-bigquery-dataframes/issues/1103)) ([9867a78](https://github.com/googleapis/python-bigquery-dataframes/commit/9867a788e7c46bf0850cacbe7cd41a11fea32d6b)) +* Update Session doctrings to include exceptions ([#1130](https://github.com/googleapis/python-bigquery-dataframes/issues/1130)) ([a870421](https://github.com/googleapis/python-bigquery-dataframes/commit/a87042158b181dceee31124fe208926a3bb1071f)) + ## [1.25.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.24.0...v1.25.0) (2024-10-29) diff --git a/bigframes/version.py b/bigframes/version.py index 1a818f9057..cdbacaa9cb 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.25.0" +__version__ = "1.26.0" diff --git a/third_party/bigframes_vendored/version.py b/third_party/bigframes_vendored/version.py index 1a818f9057..cdbacaa9cb 100644 --- a/third_party/bigframes_vendored/version.py +++ b/third_party/bigframes_vendored/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.25.0" +__version__ = "1.26.0"