From 30aaae554b5a0272b4d2900e7bd2f9530aa546b6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 31 Jul 2024 16:48:07 -0700 Subject: [PATCH 01/10] chore: reduce the `remote_function` cleanup rate (#873) * chore: reduce the `remote_function` cleanup rate * minor comment change --- tests/system/conftest.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 6bd7bf9348..83c8baac39 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -49,9 +49,11 @@ # We are running pytest with "-n 20". For a rough estimation, let's say all # parallel sessions run in parallel. So that allows 1000/20 = 50 mutations per # minute. One session takes about 1 minute to create a remote function. This -# would allow 50-1 = 49 deletions per session. As a heuristic let's use half of -# that potential for the clean up. -MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION = 25 +# would allow 50-1 = 49 deletions per session. +# However, because of b/356217175 the service may throw ResourceExhausted("Too +# many operations are currently being executed, try again later."), so we peg +# the cleanup to a more controlled rate. +MAX_NUM_FUNCTIONS_TO_DELETE_PER_SESSION = 15 CURRENT_DIR = pathlib.Path(__file__).parent DATA_DIR = CURRENT_DIR.parent / "data" From e9b05571123cf13079772856317ca3cd3d564c5a Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Thu, 1 Aug 2024 13:35:36 -0700 Subject: [PATCH 02/10] docs: update streaming notebook (#875) --- notebooks/streaming/streaming_dataframe.ipynb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/notebooks/streaming/streaming_dataframe.ipynb b/notebooks/streaming/streaming_dataframe.ipynb index a2da30720d..d4cc255fa5 100644 --- a/notebooks/streaming/streaming_dataframe.ipynb +++ b/notebooks/streaming/streaming_dataframe.ipynb @@ -5,12 +5,12 @@ "metadata": {}, "source": [ "### BigFrames StreamingDataFrame\n", - "bigframes.streaming.StreamingDataFrame is a special DataFrame type that allows simple operations and can create steaming jobs to BigTable and PubSub.\n", + "bigframes.streaming.StreamingDataFrame is a special DataFrame type that allows simple operations and can create streaming jobs to process real-time data and reverse ETL output to Bigtable and Pub/Sub using [BigQuery continuous queries](https://cloud.google.com/bigquery/docs/continuous-queries-introduction).\n", "\n", "In this notebook, we will:\n", "* Create a StreamingDataFrame from a BigQuery table\n", - "* Do some opeartions like select, filter and preview the content\n", - "* Create and manage streaming jobs to both BigTable and Pubsub" + "* Do some operations like select, filter and preview the content\n", + "* Create and manage streaming jobs to both Bigtable and Pub/Sub" ] }, { From 042db4b3d4e4142dabca305e706c78d7766697ef Mon Sep 17 00:00:00 2001 From: "gcf-owl-bot[bot]" <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:12:15 -0700 Subject: [PATCH 03/10] chore(python): fix docs build (#871) Source-Link: https://github.com/googleapis/synthtool/commit/bef813d194de29ddf3576eda60148b6b3dcc93d9 Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-python:latest@sha256:94bb690db96e6242b2567a4860a94d48fa48696d092e51b0884a1a2c0a79a407 Co-authored-by: Owl Bot --- .github/.OwlBot.lock.yaml | 4 ++-- .kokoro/docker/docs/Dockerfile | 9 ++++----- .kokoro/publish-docs.sh | 23 ++++++++++------------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index f30cb3775a..6d064ddb9b 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:52210e0e0559f5ea8c52be148b33504022e1faef4e95fbe4b32d68022af2fa7e -# created: 2024-07-08T19:25:35.862283192Z + digest: sha256:94bb690db96e6242b2567a4860a94d48fa48696d092e51b0884a1a2c0a79a407 +# created: 2024-07-31T14:52:44.926548819Z diff --git a/.kokoro/docker/docs/Dockerfile b/.kokoro/docker/docs/Dockerfile index 5205308b33..e5410e296b 100644 --- a/.kokoro/docker/docs/Dockerfile +++ b/.kokoro/docker/docs/Dockerfile @@ -72,19 +72,18 @@ RUN tar -xvf Python-3.10.14.tgz RUN ./Python-3.10.14/configure --enable-optimizations RUN make altinstall -RUN python3.10 -m venv /venv -ENV PATH /venv/bin:$PATH +ENV PATH /usr/local/bin/python3.10:$PATH ###################### Install pip RUN wget -O /tmp/get-pip.py 'https://bootstrap.pypa.io/get-pip.py' \ - && python3 /tmp/get-pip.py \ + && python3.10 /tmp/get-pip.py \ && rm /tmp/get-pip.py # Test pip -RUN python3 -m pip +RUN python3.10 -m pip # Install build requirements COPY requirements.txt /requirements.txt -RUN python3 -m pip install --require-hashes -r requirements.txt +RUN python3.10 -m pip install --require-hashes -r requirements.txt CMD ["python3.10"] diff --git a/.kokoro/publish-docs.sh b/.kokoro/publish-docs.sh index da9ce803dd..233205d580 100755 --- a/.kokoro/publish-docs.sh +++ b/.kokoro/publish-docs.sh @@ -21,18 +21,18 @@ export PYTHONUNBUFFERED=1 export PATH="${HOME}/.local/bin:${PATH}" # Install nox -python3 -m pip install --require-hashes -r .kokoro/requirements.txt -python3 -m nox --version +python3.10 -m pip install --require-hashes -r .kokoro/requirements.txt +python3.10 -m nox --version # build docs nox -s docs # create metadata -python3 -m docuploader create-metadata \ +python3.10 -m docuploader create-metadata \ --name=$(jq --raw-output '.name // empty' .repo-metadata.json) \ - --version=$(python3 setup.py --version) \ + --version=$(python3.10 setup.py --version) \ --language=$(jq --raw-output '.language // empty' .repo-metadata.json) \ - --distribution-name=$(python3 setup.py --name) \ + --distribution-name=$(python3.10 setup.py --name) \ --product-page=$(jq --raw-output '.product_documentation // empty' .repo-metadata.json) \ --github-repository=$(jq --raw-output '.repo // empty' .repo-metadata.json) \ --issue-tracker=$(jq --raw-output '.issue_tracker // empty' .repo-metadata.json) @@ -40,26 +40,23 @@ python3 -m docuploader create-metadata \ cat docs.metadata # upload docs -python3 -m docuploader upload docs/_build/html --metadata-file docs.metadata --staging-bucket "${STAGING_BUCKET}" +python3.10 -m docuploader upload docs/_build/html --metadata-file docs.metadata --staging-bucket "${STAGING_BUCKET}" # docfx yaml files nox -s docfx # create metadata. -python3 -m docuploader create-metadata \ +python3.10 -m docuploader create-metadata \ --name=$(jq --raw-output '.name // empty' .repo-metadata.json) \ - --version=$(python3 setup.py --version) \ + --version=$(python3.10 setup.py --version) \ --language=$(jq --raw-output '.language // empty' .repo-metadata.json) \ - --distribution-name=$(python3 setup.py --name) \ + --distribution-name=$(python3.10 setup.py --name) \ --product-page=$(jq --raw-output '.product_documentation // empty' .repo-metadata.json) \ --github-repository=$(jq --raw-output '.repo // empty' .repo-metadata.json) \ --issue-tracker=$(jq --raw-output '.issue_tracker // empty' .repo-metadata.json) cat docs.metadata -# Replace toc.yml template file -mv docs/templates/toc.yml docs/_build/html/docfx_yaml/toc.yml - # upload docs -python3 -m docuploader upload docs/_build/html/docfx_yaml --metadata-file docs.metadata --destination-prefix docfx --staging-bucket "${V2_STAGING_BUCKET}" +python3.10 -m docuploader upload docs/_build/html/docfx_yaml --metadata-file docs.metadata --destination-prefix docfx --staging-bucket "${V2_STAGING_BUCKET}" From 9959fc8fcba93441fdd3d9c17e8fdbe6e6a7b504 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Thu, 1 Aug 2024 16:55:24 -0700 Subject: [PATCH 04/10] fix: Fix issue with invalid sql generated by ml distance functions (#865) --- bigframes/core/compile/scalar_op_compiler.py | 29 ++++ bigframes/ml/core.py | 131 ++++++++---------- bigframes/ml/metrics/pairwise.py | 47 ++++--- bigframes/ml/sql.py | 51 +++---- bigframes/operations/__init__.py | 11 ++ bigframes/operations/type.py | 19 +++ .../system/small/ml/test_metrics_pairwise.py | 41 ++++++ tests/unit/ml/test_sql.py | 41 ++---- 8 files changed, 216 insertions(+), 154 deletions(-) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 0bc9f2e370..06e9481d17 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1380,6 +1380,30 @@ def minimum_impl( return ibis.case().when(upper.isnull() | (value > upper), upper).else_(value).end() +@scalar_op_compiler.register_binary_op(ops.cosine_distance_op) +def cosine_distance_impl( + vector1: ibis_types.Value, + vector2: ibis_types.Value, +): + return vector_distance(vector1, vector2, "COSINE") + + +@scalar_op_compiler.register_binary_op(ops.euclidean_distance_op) +def euclidean_distance_impl( + vector1: ibis_types.Value, + vector2: ibis_types.Value, +): + return vector_distance(vector1, vector2, "EUCLIDEAN") + + +@scalar_op_compiler.register_binary_op(ops.manhattan_distance_op) +def manhattan_distance_impl( + vector1: ibis_types.Value, + vector2: ibis_types.Value, +): + return vector_distance(vector1, vector2, "MANHATTAN") + + @scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) def binary_remote_function_op_impl( x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp @@ -1501,3 +1525,8 @@ def json_set( json_obj: ibis_dtypes.JSON, json_path: ibis_dtypes.str, json_value ) -> ibis_dtypes.JSON: """Produces a new SQL JSON value with the specified JSON data inserted or replaced.""" + + +@ibis.udf.scalar.builtin(name="ML.DISTANCE") +def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: + """Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")""" diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index f1b36651f4..d570945f16 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -17,7 +17,7 @@ from __future__ import annotations import datetime -from typing import Callable, cast, Iterable, Literal, Mapping, Optional, Union +from typing import Callable, cast, Iterable, Mapping, Optional, Union import uuid from google.cloud import bigquery @@ -35,11 +35,27 @@ def __init__(self, session: bigframes.Session): self._session = session self._base_sql_generator = ml_sql.BaseSqlGenerator() - def _apply_sql( + +class BqmlModel(BaseBqml): + """Represents an existing BQML model in BigQuery. + + Wraps the BQML API and SQL interface to expose the functionality needed for + BigQuery DataFrames ML. + """ + + def __init__(self, session: bigframes.Session, model: bigquery.Model): + self._session = session + self._model = model + self._model_manipulation_sql_generator = ml_sql.ModelManipulationSqlGenerator( + self.model_name + ) + + def _apply_ml_tvf( self, input_data: bpd.DataFrame, - func: Callable[[bpd.DataFrame], str], + apply_sql_tvf: Callable[[str], str], ) -> bpd.DataFrame: + # Used for predict, transform, distance """Helper to wrap a dataframe in a SQL query, keeping the index intact. Args: @@ -50,67 +66,28 @@ def _apply_sql( the dataframe to be wrapped func (function): - a function that will accept a SQL string and produce a new SQL - string from which to construct the output dataframe. It must - include the index columns of the input SQL. + Takes an input sql table value and applies a prediction tvf. The + resulting table value must include all input columns, with new + columns appended to the end. """ - _, index_col_ids, index_labels = input_data._to_sql_query(include_index=True) - - sql = func(input_data) - df = self._session.read_gbq(sql, index_col=index_col_ids) - df.index.names = index_labels - - return df - - def distance( - self, - x: bpd.DataFrame, - y: bpd.DataFrame, - type: Literal["EUCLIDEAN", "MANHATTAN", "COSINE"], - name: str, - ) -> bpd.DataFrame: - """Calculate ML.DISTANCE from DataFrame inputs. - - Args: - x: - input DataFrame - y: - input DataFrame - type: - Distance types, accept values are "EUCLIDEAN", "MANHATTAN", "COSINE". - name: - name of the output result column - """ - assert len(x.columns) == 1 and len(y.columns) == 1 - - input_data = x.join(y, how="outer").cache() - x_column_id, y_column_id = x._block.value_columns[0], y._block.value_columns[0] - - return self._apply_sql( - input_data, - lambda source_df: self._base_sql_generator.ml_distance( - x_column_id, - y_column_id, - type=type, - source_df=source_df, - name=name, - ), + # TODO: Preserve ordering information? + input_sql, index_col_ids, index_labels = input_data._to_sql_query( + include_index=True ) - -class BqmlModel(BaseBqml): - """Represents an existing BQML model in BigQuery. - - Wraps the BQML API and SQL interface to expose the functionality needed for - BigQuery DataFrames ML. - """ - - def __init__(self, session: bigframes.Session, model: bigquery.Model): - self._session = session - self._model = model - self._model_manipulation_sql_generator = ml_sql.ModelManipulationSqlGenerator( - self.model_name + result_sql = apply_sql_tvf(input_sql) + df = self._session.read_gbq(result_sql, index_col=index_col_ids) + df.index.names = index_labels + # Restore column labels + df.rename( + columns={ + label: original_label + for label, original_label in zip( + df.columns.values, input_data.columns.values + ) + } ) + return df def _keys(self): return (self._session, self._model) @@ -137,13 +114,13 @@ def model(self) -> bigquery.Model: return self._model def predict(self, input_data: bpd.DataFrame) -> bpd.DataFrame: - return self._apply_sql( + return self._apply_ml_tvf( input_data, self._model_manipulation_sql_generator.ml_predict, ) def transform(self, input_data: bpd.DataFrame) -> bpd.DataFrame: - return self._apply_sql( + return self._apply_ml_tvf( input_data, self._model_manipulation_sql_generator.ml_transform, ) @@ -153,10 +130,10 @@ def generate_text( input_data: bpd.DataFrame, options: Mapping[str, int | float], ) -> bpd.DataFrame: - return self._apply_sql( + return self._apply_ml_tvf( input_data, - lambda source_df: self._model_manipulation_sql_generator.ml_generate_text( - source_df=source_df, + lambda source_sql: self._model_manipulation_sql_generator.ml_generate_text( + source_sql=source_sql, struct_options=options, ), ) @@ -166,10 +143,10 @@ def generate_embedding( input_data: bpd.DataFrame, options: Mapping[str, int | float], ) -> bpd.DataFrame: - return self._apply_sql( + return self._apply_ml_tvf( input_data, - lambda source_df: self._model_manipulation_sql_generator.ml_generate_embedding( - source_df=source_df, + lambda source_sql: self._model_manipulation_sql_generator.ml_generate_embedding( + source_sql=source_sql, struct_options=options, ), ) @@ -179,10 +156,10 @@ def detect_anomalies( ) -> bpd.DataFrame: assert self._model.model_type in ("PCA", "KMEANS", "ARIMA_PLUS") - return self._apply_sql( + return self._apply_ml_tvf( input_data, - lambda source_df: self._model_manipulation_sql_generator.ml_detect_anomalies( - source_df=source_df, + lambda source_sql: self._model_manipulation_sql_generator.ml_detect_anomalies( + source_sql=source_sql, struct_options=options, ), ) @@ -192,7 +169,9 @@ def forecast(self, options: Mapping[str, int | float]) -> bpd.DataFrame: return self._session.read_gbq(sql, index_col="forecast_timestamp").reset_index() def evaluate(self, input_data: Optional[bpd.DataFrame] = None): - sql = self._model_manipulation_sql_generator.ml_evaluate(input_data) + sql = self._model_manipulation_sql_generator.ml_evaluate( + input_data.sql if (input_data is not None) else None + ) return self._session.read_gbq(sql) @@ -202,7 +181,7 @@ def llm_evaluate( task_type: Optional[str] = None, ): sql = self._model_manipulation_sql_generator.ml_llm_evaluate( - input_data, task_type + input_data.sql, task_type ) return self._session.read_gbq(sql) @@ -336,7 +315,7 @@ def create_model( model_ref = self._create_model_ref(session._anonymous_dataset) sql = self._model_creation_sql_generator.create_model( - source_df=input_data, + source_sql=input_data.sql, model_ref=model_ref, transforms=transforms, options=options, @@ -374,7 +353,7 @@ def create_llm_remote_model( model_ref = self._create_model_ref(session._anonymous_dataset) sql = self._model_creation_sql_generator.create_llm_remote_model( - source_df=input_data, + source_sql=input_data.sql, model_ref=model_ref, options=options, connection_name=connection_name, @@ -407,7 +386,7 @@ def create_time_series_model( model_ref = self._create_model_ref(session._anonymous_dataset) sql = self._model_creation_sql_generator.create_model( - source_df=input_data, + source_sql=input_data.sql, model_ref=model_ref, transforms=transforms, options=options, diff --git a/bigframes/ml/metrics/pairwise.py b/bigframes/ml/metrics/pairwise.py index bdbe4a682d..0e43412b21 100644 --- a/bigframes/ml/metrics/pairwise.py +++ b/bigframes/ml/metrics/pairwise.py @@ -17,19 +17,24 @@ import bigframes_vendored.sklearn.metrics.pairwise as vendored_metrics_pairwise -from bigframes.ml import core, utils +from bigframes.ml import utils +import bigframes.operations as ops import bigframes.pandas as bpd def paired_cosine_distances( X: Union[bpd.DataFrame, bpd.Series], Y: Union[bpd.DataFrame, bpd.Series] ) -> bpd.DataFrame: - X, Y = utils.convert_to_dataframe(X, Y) - if len(X.columns) != 1 or len(Y.columns) != 1: - raise ValueError("Inputs X and Y can only contain 1 column.") + X, Y = utils.convert_to_series(X, Y) + joined_block, _ = X._block.join(Y._block, how="outer") - base_bqml = core.BaseBqml(session=X._session) - return base_bqml.distance(X, Y, type="COSINE", name="cosine_distance") + result_block, _ = joined_block.project_expr( + ops.cosine_distance_op.as_expr( + joined_block.value_columns[0], joined_block.value_columns[1] + ), + label="cosine_distance", + ) + return bpd.DataFrame(result_block) paired_cosine_distances.__doc__ = inspect.getdoc( @@ -40,12 +45,16 @@ def paired_cosine_distances( def paired_manhattan_distance( X: Union[bpd.DataFrame, bpd.Series], Y: Union[bpd.DataFrame, bpd.Series] ) -> bpd.DataFrame: - X, Y = utils.convert_to_dataframe(X, Y) - if len(X.columns) != 1 or len(Y.columns) != 1: - raise ValueError("Inputs X and Y can only contain 1 column.") + X, Y = utils.convert_to_series(X, Y) + joined_block, _ = X._block.join(Y._block, how="outer") - base_bqml = core.BaseBqml(session=X._session) - return base_bqml.distance(X, Y, type="MANHATTAN", name="manhattan_distance") + result_block, _ = joined_block.project_expr( + ops.manhattan_distance_op.as_expr( + joined_block.value_columns[0], joined_block.value_columns[1] + ), + label="manhattan_distance", + ) + return bpd.DataFrame(result_block) paired_manhattan_distance.__doc__ = inspect.getdoc( @@ -56,12 +65,16 @@ def paired_manhattan_distance( def paired_euclidean_distances( X: Union[bpd.DataFrame, bpd.Series], Y: Union[bpd.DataFrame, bpd.Series] ) -> bpd.DataFrame: - X, Y = utils.convert_to_dataframe(X, Y) - if len(X.columns) != 1 or len(Y.columns) != 1: - raise ValueError("Inputs X and Y can only contain 1 column.") - - base_bqml = core.BaseBqml(session=X._session) - return base_bqml.distance(X, Y, type="EUCLIDEAN", name="euclidean_distance") + X, Y = utils.convert_to_series(X, Y) + joined_block, _ = X._block.join(Y._block, how="outer") + + result_block, _ = joined_block.project_expr( + ops.euclidean_distance_op.as_expr( + joined_block.value_columns[0], joined_block.value_columns[1] + ), + label="euclidean_distance", + ) + return bpd.DataFrame(result_block) paired_euclidean_distances.__doc__ = inspect.getdoc( diff --git a/bigframes/ml/sql.py b/bigframes/ml/sql.py index 0399db3a10..d14627f590 100644 --- a/bigframes/ml/sql.py +++ b/bigframes/ml/sql.py @@ -21,9 +21,9 @@ import google.cloud.bigquery import bigframes.constants as constants -import bigframes.pandas as bpd +# TODO: Add proper escaping logic from core/compile module class BaseSqlGenerator: """Generate base SQL strings for ML. Model name isn't needed in this class.""" @@ -170,12 +170,11 @@ def ml_distance( col_x: str, col_y: str, type: Literal["EUCLIDEAN", "MANHATTAN", "COSINE"], - source_df: bpd.DataFrame, + source_sql: str, name: str, ) -> str: """Encode ML.DISTANCE for BQML. https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-distance""" - source_sql, _, _ = source_df._to_sql_query(include_index=True) return f"""SELECT *, ML.DISTANCE({col_x}, {col_y}, '{type}') AS {name} FROM ({source_sql})""" @@ -191,14 +190,12 @@ def _model_id_sql( # Model create and alter def create_model( self, - source_df: bpd.DataFrame, + source_sql: str, model_ref: google.cloud.bigquery.ModelReference, options: Mapping[str, Union[str, int, float, Iterable[str]]] = {}, transforms: Optional[Iterable[str]] = None, ) -> str: """Encode the CREATE OR REPLACE MODEL statement for BQML""" - source_sql = source_df.sql - parts = [f"CREATE OR REPLACE MODEL {self._model_id_sql(model_ref)}"] if transforms: parts.append(self.transform(*transforms)) @@ -209,14 +206,12 @@ def create_model( def create_llm_remote_model( self, - source_df: bpd.DataFrame, + source_sql: str, connection_name: str, model_ref: google.cloud.bigquery.ModelReference, options: Mapping[str, Union[str, int, float, Iterable[str]]] = {}, ) -> str: """Encode the CREATE OR REPLACE MODEL statement for BQML""" - source_sql = source_df.sql - parts = [f"CREATE OR REPLACE MODEL {self._model_id_sql(model_ref)}"] parts.append(self.connection(connection_name)) if options: @@ -280,11 +275,6 @@ class ModelManipulationSqlGenerator(BaseSqlGenerator): def __init__(self, model_name: str): self._model_name = model_name - def _source_sql(self, source_df: bpd.DataFrame) -> str: - """Return DataFrame sql with index columns.""" - _source_sql, _, _ = source_df._to_sql_query(include_index=True) - return _source_sql - # Alter model def alter_model( self, @@ -298,10 +288,10 @@ def alter_model( return "\n".join(parts) # ML prediction TVFs - def ml_predict(self, source_df: bpd.DataFrame) -> str: + def ml_predict(self, source_sql: str) -> str: """Encode ML.PREDICT for BQML""" return f"""SELECT * FROM ML.PREDICT(MODEL `{self._model_name}`, - ({self._source_sql(source_df)}))""" + ({source_sql}))""" def ml_forecast(self, struct_options: Mapping[str, Union[int, float]]) -> str: """Encode ML.FORECAST for BQML""" @@ -310,38 +300,32 @@ def ml_forecast(self, struct_options: Mapping[str, Union[int, float]]) -> str: {struct_options_sql})""" def ml_generate_text( - self, source_df: bpd.DataFrame, struct_options: Mapping[str, Union[int, float]] + self, source_sql: str, struct_options: Mapping[str, Union[int, float]] ) -> str: """Encode ML.GENERATE_TEXT for BQML""" struct_options_sql = self.struct_options(**struct_options) return f"""SELECT * FROM ML.GENERATE_TEXT(MODEL `{self._model_name}`, - ({self._source_sql(source_df)}), {struct_options_sql})""" + ({source_sql}), {struct_options_sql})""" def ml_generate_embedding( - self, source_df: bpd.DataFrame, struct_options: Mapping[str, Union[int, float]] + self, source_sql: str, struct_options: Mapping[str, Union[int, float]] ) -> str: """Encode ML.GENERATE_EMBEDDING for BQML""" struct_options_sql = self.struct_options(**struct_options) return f"""SELECT * FROM ML.GENERATE_EMBEDDING(MODEL `{self._model_name}`, - ({self._source_sql(source_df)}), {struct_options_sql})""" + ({source_sql}), {struct_options_sql})""" def ml_detect_anomalies( - self, source_df: bpd.DataFrame, struct_options: Mapping[str, Union[int, float]] + self, source_sql: str, struct_options: Mapping[str, Union[int, float]] ) -> str: """Encode ML.DETECT_ANOMALIES for BQML""" struct_options_sql = self.struct_options(**struct_options) return f"""SELECT * FROM ML.DETECT_ANOMALIES(MODEL `{self._model_name}`, - {struct_options_sql}, ({self._source_sql(source_df)}))""" + {struct_options_sql}, ({source_sql}))""" # ML evaluation TVFs - def ml_evaluate(self, source_df: Optional[bpd.DataFrame] = None) -> str: + def ml_evaluate(self, source_sql: Optional[str] = None) -> str: """Encode ML.EVALUATE for BQML""" - if source_df is None: - source_sql = None - else: - # Note: don't need index as evaluate returns a new table - source_sql, _, _ = source_df._to_sql_query(include_index=False) - if source_sql is None: return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`)""" else: @@ -353,12 +337,9 @@ def ml_arima_coefficients(self) -> str: return f"""SELECT * FROM ML.ARIMA_COEFFICIENTS(MODEL `{self._model_name}`)""" # ML evaluation TVFs - def ml_llm_evaluate( - self, source_df: bpd.DataFrame, task_type: Optional[str] = None - ) -> str: + def ml_llm_evaluate(self, source_sql: str, task_type: Optional[str] = None) -> str: """Encode ML.EVALUATE for BQML""" # Note: don't need index as evaluate returns a new table - source_sql, _, _ = source_df._to_sql_query(include_index=False) return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`, ({source_sql}), STRUCT("{task_type}" AS task_type))""" @@ -383,7 +364,7 @@ def ml_principal_component_info(self) -> str: ) # ML transform TVF, that require a transform_only type model - def ml_transform(self, source_df: bpd.DataFrame) -> str: + def ml_transform(self, source_sql: str) -> str: """Encode ML.TRANSFORM for BQML""" return f"""SELECT * FROM ML.TRANSFORM(MODEL `{self._model_name}`, - ({self._source_sql(source_df)}))""" + ({source_sql}))""" diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 145c415ca0..23f2a50a95 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -690,6 +690,17 @@ def output_type(self, *input_types): ge_op = create_binary_op(name="ge", type_signature=op_typing.COMPARISON) +cosine_distance_op = create_binary_op( + name="ml_cosine_distance", type_signature=op_typing.VECTOR_METRIC +) +manhattan_distance_op = create_binary_op( + name="ml_manhattan_distance", type_signature=op_typing.VECTOR_METRIC +) +euclidean_distance_op = create_binary_op( + name="ml_euclidean_distance", type_signature=op_typing.VECTOR_METRIC +) + + ## String Ops @dataclasses.dataclass(frozen=True) class StrConcatOp(BinaryOp): diff --git a/bigframes/operations/type.py b/bigframes/operations/type.py index f469070805..ce37b8da55 100644 --- a/bigframes/operations/type.py +++ b/bigframes/operations/type.py @@ -190,6 +190,24 @@ def output_type( return left_type +@dataclasses.dataclass +class VectorMetric(BinaryTypeSignature): + """Type signature for logical operators like AND, OR and NOT.""" + + def output_type( + self, left_type: ExpressionType, right_type: ExpressionType + ) -> ExpressionType: + if not bigframes.dtypes.is_array_like(left_type): + raise TypeError(f"Type {left_type} is not array-like") + if not bigframes.dtypes.is_array_like(right_type): + raise TypeError(f"Type {right_type} is not array-like") + if left_type != right_type: + raise TypeError( + "Vector op operands {left_type} and {right_type} do not match" + ) + return bigframes.dtypes.FLOAT_DTYPE + + # Common type signatures UNARY_NUMERIC = TypePreserving(bigframes.dtypes.is_numeric, description="numeric") UNARY_REAL_NUMERIC = UnaryRealNumeric() @@ -212,3 +230,4 @@ def output_type( TIMELIKE_ACCESSOR = FixedOutputType( bigframes.dtypes.is_time_like, bigframes.dtypes.INT_DTYPE, description="time-like" ) +VECTOR_METRIC = VectorMetric() diff --git a/tests/system/small/ml/test_metrics_pairwise.py b/tests/system/small/ml/test_metrics_pairwise.py index 717f32667f..d3798f7cae 100644 --- a/tests/system/small/ml/test_metrics_pairwise.py +++ b/tests/system/small/ml/test_metrics_pairwise.py @@ -35,6 +35,47 @@ def test_paired_cosine_distances(): ) +def test_paired_cosine_distances_multiindex(): + x_col = [np.array([4.1, 0.5, 1.0])] + y_col = [np.array([3.0, 0.0, 2.5])] + data = bpd.read_pandas( + pd.DataFrame( + {("DATA", "X"): x_col, ("DATA", "Y"): y_col}, + ) + ) + + result = metrics.pairwise.paired_cosine_distances( + data[("DATA", "X")], data[("DATA", "Y")] + ) + expected_pd_df = pd.DataFrame( + { + ("DATA", "X"): x_col, + ("DATA", "Y"): y_col, + ("cosine_distance", ""): [0.108199], + } + ) + + pd.testing.assert_frame_equal( + result.to_pandas(), expected_pd_df, check_dtype=False, check_index_type=False + ) + + +def test_paired_cosine_distances_single_frame(): + x_col = [np.array([4.1, 0.5, 1.0])] + y_col = [np.array([3.0, 0.0, 2.5])] + input = bpd.read_pandas(pd.DataFrame({"X": x_col})) + input["Y"] = y_col # type: ignore + + result = metrics.pairwise.paired_cosine_distances(input.X, input.Y) + expected_pd_df = pd.DataFrame( + {"X": x_col, "Y": y_col, "cosine_distance": [0.108199]} + ) + + pd.testing.assert_frame_equal( + result.to_pandas(), expected_pd_df, check_dtype=False, check_index_type=False + ) + + def test_paired_manhattan_distance(): x_col = [np.array([4.1, 0.5, 1.0])] y_col = [np.array([3.0, 0.0, 2.5])] diff --git a/tests/unit/ml/test_sql.py b/tests/unit/ml/test_sql.py index e90146565d..cdf2d0b2e4 100644 --- a/tests/unit/ml/test_sql.py +++ b/tests/unit/ml/test_sql.py @@ -152,23 +152,12 @@ def test_polynomial_expand( assert sql == "ML.POLYNOMIAL_EXPAND(STRUCT(col_a, col_b), 2) AS poly_exp" -def test_distance_correct( - base_sql_generator: ml_sql.BaseSqlGenerator, - mock_df: bpd.DataFrame, -): - sql = base_sql_generator.ml_distance("col_a", "col_b", "COSINE", mock_df, "cosine") - assert ( - sql - == "SELECT *, ML.DISTANCE(col_a, col_b, 'COSINE') AS cosine FROM (input_X_sql)" - ) - - def test_create_model_correct( model_creation_sql_generator: ml_sql.ModelCreationSqlGenerator, mock_df: bpd.DataFrame, ): sql = model_creation_sql_generator.create_model( - source_df=mock_df, + source_sql=mock_df.sql, model_ref=bigquery.ModelReference.from_string( "test-proj._anonXYZ.create_model_correct_sql" ), @@ -189,7 +178,7 @@ def test_create_model_transform_correct( mock_df: bpd.DataFrame, ): sql = model_creation_sql_generator.create_model( - source_df=mock_df, + source_sql=mock_df.sql, model_ref=bigquery.ModelReference.from_string( "test-proj._anonXYZ.create_model_transform" ), @@ -217,7 +206,7 @@ def test_create_llm_remote_model_correct( mock_df: bpd.DataFrame, ): sql = model_creation_sql_generator.create_llm_remote_model( - source_df=mock_df, + source_sql=mock_df.sql, connection_name="my_project.us.my_connection", model_ref=bigquery.ModelReference.from_string( "test-proj._anonXYZ.create_remote_model" @@ -342,11 +331,11 @@ def test_ml_predict_correct( model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator, mock_df: bpd.DataFrame, ): - sql = model_manipulation_sql_generator.ml_predict(source_df=mock_df) + sql = model_manipulation_sql_generator.ml_predict(source_sql=mock_df.sql) assert ( sql == """SELECT * FROM ML.PREDICT(MODEL `my_project_id.my_dataset_id.my_model_id`, - (input_X_sql))""" + (input_X_y_sql))""" ) @@ -355,12 +344,12 @@ def test_ml_llm_evaluate_correct( mock_df: bpd.DataFrame, ): sql = model_manipulation_sql_generator.ml_llm_evaluate( - source_df=mock_df, task_type="CLASSIFICATION" + source_sql=mock_df.sql, task_type="CLASSIFICATION" ) assert ( sql == """SELECT * FROM ML.EVALUATE(MODEL `my_project_id.my_dataset_id.my_model_id`, - (input_X_sql), STRUCT("CLASSIFICATION" AS task_type))""" + (input_X_y_sql), STRUCT("CLASSIFICATION" AS task_type))""" ) @@ -368,11 +357,11 @@ def test_ml_evaluate_correct( model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator, mock_df: bpd.DataFrame, ): - sql = model_manipulation_sql_generator.ml_evaluate(source_df=mock_df) + sql = model_manipulation_sql_generator.ml_evaluate(source_sql=mock_df.sql) assert ( sql == """SELECT * FROM ML.EVALUATE(MODEL `my_project_id.my_dataset_id.my_model_id`, - (input_X_sql))""" + (input_X_y_sql))""" ) @@ -429,13 +418,13 @@ def test_ml_generate_text_correct( mock_df: bpd.DataFrame, ): sql = model_manipulation_sql_generator.ml_generate_text( - source_df=mock_df, + source_sql=mock_df.sql, struct_options={"option_key1": 1, "option_key2": 2.2}, ) assert ( sql == """SELECT * FROM ML.GENERATE_TEXT(MODEL `my_project_id.my_dataset_id.my_model_id`, - (input_X_sql), STRUCT( + (input_X_y_sql), STRUCT( 1 AS option_key1, 2.2 AS option_key2))""" ) @@ -446,13 +435,13 @@ def test_ml_generate_embedding_correct( mock_df: bpd.DataFrame, ): sql = model_manipulation_sql_generator.ml_generate_embedding( - source_df=mock_df, + source_sql=mock_df.sql, struct_options={"option_key1": 1, "option_key2": 2.2}, ) assert ( sql == """SELECT * FROM ML.GENERATE_EMBEDDING(MODEL `my_project_id.my_dataset_id.my_model_id`, - (input_X_sql), STRUCT( + (input_X_y_sql), STRUCT( 1 AS option_key1, 2.2 AS option_key2))""" ) @@ -463,7 +452,7 @@ def test_ml_detect_anomalies_correct_sql( mock_df: bpd.DataFrame, ): sql = model_manipulation_sql_generator.ml_detect_anomalies( - source_df=mock_df, + source_sql=mock_df.sql, struct_options={"option_key1": 1, "option_key2": 2.2}, ) assert ( @@ -471,7 +460,7 @@ def test_ml_detect_anomalies_correct_sql( == """SELECT * FROM ML.DETECT_ANOMALIES(MODEL `my_project_id.my_dataset_id.my_model_id`, STRUCT( 1 AS option_key1, - 2.2 AS option_key2), (input_X_sql))""" + 2.2 AS option_key2), (input_X_y_sql))""" ) From 2158818e53e09e55c87ffd574e3ebc2e201285fb Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 2 Aug 2024 10:02:13 -0700 Subject: [PATCH 05/10] feat: `df.apply(axis=1)` to support remote function with mutiple params (#851) * feat: extend `df.apply(axis=1)` to support remote function with mutiple params * add doctest, make small test remote function sticky * handle single param non-row-processing functions * reword the documentation a bit * handle missing input dtype in read_gbq_function * restore input types as tuple in read_gbq_function * clear previous remote function attributes * reword documentation for clarity * add/update comments to explain force reproject * make doctest example remote function with 3 params --- bigframes/core/compile/scalar_op_compiler.py | 26 ++- bigframes/dataframe.py | 148 ++++++++++------- bigframes/exceptions.py | 4 + bigframes/functions/remote_function.py | 40 ++++- bigframes/operations/__init__.py | 13 ++ bigframes/series.py | 11 +- bigframes/session/__init__.py | 4 +- tests/system/large/test_remote_function.py | 154 +++++++++++++++++- tests/system/small/test_remote_function.py | 80 +++++---- .../bigframes_vendored/pandas/core/frame.py | 43 ++++- 10 files changed, 417 insertions(+), 106 deletions(-) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 06e9481d17..67d0dac436 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -191,19 +191,27 @@ def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): return decorator - def register_nary_op(self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]]): + def register_nary_op( + self, op_ref: typing.Union[ops.NaryOp, type[ops.NaryOp]], pass_op: bool = False + ): """ Decorator to register a nary op implementation. Args: op_ref (NaryOp or NaryOp type): Class or instance of operator that is implemented by the decorated function. + pass_op (bool): + Set to true if implementation takes the operator object as the last argument. + This is needed for parameterized ops where parameters are part of op object. """ key = typing.cast(str, op_ref.name) def decorator(impl: typing.Callable[..., ibis_types.Value]): def normalized_impl(args: typing.Sequence[ibis_types.Value], op: ops.RowOp): - return impl(*args) + if pass_op: + return impl(*args, op=op) + else: + return impl(*args) self._register(key, normalized_impl) return impl @@ -1468,6 +1476,7 @@ def clip_op( ) +# N-ary Operations @scalar_op_compiler.register_nary_op(ops.case_when_op) def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: # ibis can handle most type coercions, but we need to force bool -> int @@ -1487,6 +1496,19 @@ def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: return case_val.end() +@scalar_op_compiler.register_nary_op(ops.NaryRemoteFunctionOp, pass_op=True) +def nary_remote_function_op_impl( + *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp +): + ibis_node = getattr(op.func, "ibis_node", None) + if ibis_node is None: + raise TypeError( + f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" + ) + result = ibis_node(*operands) + return result + + # Helpers def is_null(value) -> bool: # float NaN/inf should be treated as distinct from 'true' null values diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9789c7cf9f..9d3b153d3a 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3433,9 +3433,9 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: raise ValueError(f"na_action={na_action} not supported") # TODO(shobs): Support **kwargs - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_df = DataFrame(self._block._force_reproject()) return reprojected_df._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=(na_action is None)) @@ -3448,65 +3448,99 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): category=bigframes.exceptions.PreviewWarning, ) - # Early check whether the dataframe dtypes are currently supported - # in the remote function - # NOTE: Keep in sync with the value converters used in the gcf code - # generated in remote_function_template.py - remote_function_supported_dtypes = ( - bigframes.dtypes.INT_DTYPE, - bigframes.dtypes.FLOAT_DTYPE, - bigframes.dtypes.BOOL_DTYPE, - bigframes.dtypes.BYTES_DTYPE, - bigframes.dtypes.STRING_DTYPE, - ) - supported_dtypes_types = tuple( - type(dtype) - for dtype in remote_function_supported_dtypes - if not isinstance(dtype, pandas.ArrowDtype) - ) - # Check ArrowDtype separately since multiple BigQuery types map to - # ArrowDtype, including BYTES and TIMESTAMP. - supported_arrow_types = tuple( - dtype.pyarrow_dtype - for dtype in remote_function_supported_dtypes - if isinstance(dtype, pandas.ArrowDtype) - ) - supported_dtypes_hints = tuple( - str(dtype) for dtype in remote_function_supported_dtypes - ) - - for dtype in self.dtypes: - if ( - # Not one of the pandas/numpy types. - not isinstance(dtype, supported_dtypes_types) - # And not one of the arrow types. - and not ( - isinstance(dtype, pandas.ArrowDtype) - and any( - dtype.pyarrow_dtype.equals(arrow_type) - for arrow_type in supported_arrow_types - ) - ) - ): - raise NotImplementedError( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." - f" Supported dtypes are {supported_dtypes_hints}." - ) - # Check if the function is a remote function if not hasattr(func, "bigframes_remote_function"): raise ValueError("For axis=1 a remote function must be used.") - # Serialize the rows as json values - block = self._get_block() - rows_as_json_series = bigframes.series.Series( - block._get_rows_as_json_values() - ) + is_row_processor = getattr(func, "is_row_processor") + if is_row_processor: + # Early check whether the dataframe dtypes are currently supported + # in the remote function + # NOTE: Keep in sync with the value converters used in the gcf code + # generated in remote_function_template.py + remote_function_supported_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.BOOL_DTYPE, + bigframes.dtypes.BYTES_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + supported_dtypes_types = tuple( + type(dtype) + for dtype in remote_function_supported_dtypes + if not isinstance(dtype, pandas.ArrowDtype) + ) + # Check ArrowDtype separately since multiple BigQuery types map to + # ArrowDtype, including BYTES and TIMESTAMP. + supported_arrow_types = tuple( + dtype.pyarrow_dtype + for dtype in remote_function_supported_dtypes + if isinstance(dtype, pandas.ArrowDtype) + ) + supported_dtypes_hints = tuple( + str(dtype) for dtype in remote_function_supported_dtypes + ) - # Apply the function - result_series = rows_as_json_series._apply_unary_op( - ops.RemoteFunctionOp(func=func, apply_on_null=True) - ) + for dtype in self.dtypes: + if ( + # Not one of the pandas/numpy types. + not isinstance(dtype, supported_dtypes_types) + # And not one of the arrow types. + and not ( + isinstance(dtype, pandas.ArrowDtype) + and any( + dtype.pyarrow_dtype.equals(arrow_type) + for arrow_type in supported_arrow_types + ) + ) + ): + raise NotImplementedError( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1." + f" Supported dtypes are {supported_dtypes_hints}." + ) + + # Serialize the rows as json values + block = self._get_block() + rows_as_json_series = bigframes.series.Series( + block._get_rows_as_json_values() + ) + + # Apply the function + result_series = rows_as_json_series._apply_unary_op( + ops.RemoteFunctionOp(func=func, apply_on_null=True) + ) + else: + # This is a special case where we are providing not-pandas-like + # extension. If the remote function can take one or more params + # then we assume that here the user intention is to use the + # column values of the dataframe as arguments to the function. + # For this to work the following condition must be true: + # 1. The number or input params in the function must be same + # as the number of columns in the dataframe + # 2. The dtypes of the columns in the dataframe must be + # compatible with the data types of the input params + # 3. The order of the columns in the dataframe must correspond + # to the order of the input params in the function + udf_input_dtypes = getattr(func, "input_dtypes") + if len(udf_input_dtypes) != len(self.columns): + raise ValueError( + f"Remote function takes {len(udf_input_dtypes)} arguments but DataFrame has {len(self.columns)} columns." + ) + if udf_input_dtypes != tuple(self.dtypes.to_list()): + raise ValueError( + f"Remote function takes arguments of types {udf_input_dtypes} but DataFrame dtypes are {tuple(self.dtypes)}." + ) + + series_list = [self[col] for col in self.columns] + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. + reprojected_series = bigframes.series.Series( + series_list[0]._block._force_reproject() + ) + result_series = reprojected_series._apply_nary_op( + ops.NaryRemoteFunctionOp(func=func), series_list[1:] + ) result_series.name = None # Return Series with materialized result so that any error in the remote diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 1d31749760..6c5b66bc47 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -57,3 +57,7 @@ class QueryComplexityError(RuntimeError): class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" + + +class UnknownDataTypeWarning(Warning): + """Data type is unknown.""" diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index d84fbcdbab..b3c6aee1b3 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -66,6 +66,7 @@ from bigframes import clients import bigframes.constants as constants import bigframes.core.compile.ibis_types +import bigframes.dtypes import bigframes.functions.remote_function_template logger = logging.getLogger(__name__) @@ -895,8 +896,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the @@ -1174,7 +1175,9 @@ def try_delattr(attr): try_delattr("bigframes_cloud_function") try_delattr("bigframes_remote_function") + try_delattr("input_dtypes") try_delattr("output_dtype") + try_delattr("is_row_processor") try_delattr("ibis_node") ( @@ -1216,12 +1219,20 @@ def try_delattr(attr): rf_name ) ) - + func.input_dtypes = tuple( + [ + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + input_type + ) + for input_type in ibis_signature.input_types + ] + ) func.output_dtype = ( bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( ibis_signature.output_type ) ) + func.is_row_processor = is_row_processor func.ibis_node = node # If a new remote function was created, update the cloud artifacts @@ -1305,6 +1316,29 @@ def func(*ignored_args, **ignored_kwargs): signature=(ibis_signature.input_types, ibis_signature.output_type), ) func.bigframes_remote_function = str(routine_ref) # type: ignore + + # set input bigframes data types + has_unknown_dtypes = False + function_input_dtypes = [] + for ibis_type in ibis_signature.input_types: + input_dtype = cast(bigframes.dtypes.Dtype, bigframes.dtypes.DEFAULT_DTYPE) + if ibis_type is None: + has_unknown_dtypes = True + else: + input_dtype = ( + bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( + ibis_type + ) + ) + function_input_dtypes.append(input_dtype) + if has_unknown_dtypes: + warnings.warn( + "The function has one or more missing input data types." + f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.", + category=bigframes.exceptions.UnknownDataTypeWarning, + ) + func.input_dtypes = tuple(function_input_dtypes) # type: ignore + func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 23f2a50a95..523882c14e 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -659,6 +659,19 @@ def output_type(self, *input_types): raise AttributeError("output_dtype not defined") +@dataclasses.dataclass(frozen=True) +class NaryRemoteFunctionOp(NaryOp): + name: typing.ClassVar[str] = "nary_remote_function" + func: typing.Callable + + def output_type(self, *input_types): + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + if hasattr(self.func, "output_dtype"): + return self.func.output_dtype + else: + raise AttributeError("output_dtype not defined") + + add_op = AddOp() sub_op = SubOp() mul_op = create_binary_op(name="mul", type_signature=op_typing.BINARY_NUMERIC) diff --git a/bigframes/series.py b/bigframes/series.py index 1a5661529c..9e33801834 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1442,9 +1442,6 @@ def apply( ) -> Series: # TODO(shobs, b/274645634): Support convert_dtype, args, **kwargs # is actually a ternary op - # Reproject as workaround to applying filter too late. This forces the filter - # to be applied before passing data to remote function, protecting from bad - # inputs causing errors. if by_row not in ["compat", False]: raise ValueError("Param by_row must be one of 'compat' or False") @@ -1474,7 +1471,10 @@ def apply( ex.message += f"\n{_remote_function_recommendation_message}" raise - # We are working with remote function at this point + # We are working with remote function at this point. + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) @@ -1507,6 +1507,9 @@ def combine( ex.message += f"\n{_remote_function_recommendation_message}" raise + # Reproject as workaround to applying filter too late. This forces the + # filter to be applied before passing data to remote function, + # protecting from bad inputs causing errors. reprojected_series = Series(self._block._force_reproject()) result_series = reprojected_series._apply_binary_op( other, ops.BinaryRemoteFunctionOp(func=func) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 98cba867f2..233e6ef930 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1661,8 +1661,8 @@ def remote_function( reuse (bool, Optional): Reuse the remote function if already exists. `True` by default, which will result in reusing an existing remote - function and corresponding cloud function (if any) that was - previously created for the same udf. + function and corresponding cloud function that was previously + created (if any) for the same udf. Please note that for an unnamed (i.e. created without an explicit `name` argument) remote function, the BigQuery DataFrames session id is attached in the cloud artifacts names. So for the diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 303c74f1fd..095f7059cd 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -28,6 +28,9 @@ import test_utils.prefixer import bigframes +import bigframes.dataframe +import bigframes.dtypes +import bigframes.exceptions import bigframes.functions.remote_function as bigframes_rf import bigframes.pandas as bpd import bigframes.series @@ -363,7 +366,8 @@ def test_remote_function_input_types(session, scalars_dfs, input_types): def add_one(x): return x + 1 - remote_add_one = session.remote_function(input_types, int)(add_one) + remote_add_one = session.remote_function(input_types, int, reuse=False)(add_one) + assert remote_add_one.input_dtypes == (bigframes.dtypes.INT_DTYPE,) scalars_df, scalars_pandas_df = scalars_dfs @@ -1589,6 +1593,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = scalars_df[columns].apply(serialize_row_remote, axis=1).to_pandas() pd_result = scalars_pandas_df[columns].apply(serialize_row, axis=1) @@ -1622,7 +1628,11 @@ def analyze(row): } ) - analyze_remote = session.remote_function(bigframes.series.Series, str)(analyze) + analyze_remote = session.remote_function( + bigframes.series.Series, str, reuse=False + )(analyze) + + assert getattr(analyze_remote, "is_row_processor") bf_result = ( scalars_df[columns].dropna().apply(analyze_remote, axis=1).to_pandas() @@ -1727,6 +1737,8 @@ def serialize_row(row): bigframes.series.Series, str, reuse=False )(serialize_row) + assert getattr(serialize_row_remote, "is_row_processor") + bf_result = bf_df.apply(serialize_row_remote, axis=1).to_pandas() pd_result = pd_df.apply(serialize_row, axis=1) @@ -1787,6 +1799,8 @@ def float_parser(row): bigframes.series.Series, float, reuse=False )(float_parser) + assert getattr(float_parser_remote, "is_row_processor") + pd_result = pd_df.apply(float_parser, axis=1) bf_result = bf_df.apply(float_parser_remote, axis=1).to_pandas() @@ -1913,7 +1927,7 @@ def test_remote_function_named_perists_w_session_cleanup(): name = test_utils.prefixer.Prefixer("bigframes", "").create_prefix() # create an unnamed remote function in the session - @session.remote_function(name=name) + @session.remote_function(reuse=False, name=name) def foo(x: int) -> int: return x + 1 @@ -2004,3 +2018,137 @@ def foo_named(x: int) -> int: cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, foo_named ) + + +def test_df_apply_axis_1_multiple_params(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + "Age": [22.5, 23, 23.5], + "Name": ["alpha", "beta", "gamma"], + } + ) + + expected_dtypes = ( + bigframes.dtypes.INT_DTYPE, + bigframes.dtypes.FLOAT_DTYPE, + bigframes.dtypes.STRING_DTYPE, + ) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int, float, str], str, reuse=False) + def foo(x, y, z): + return f"I got {x}, {y} and {z}" + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 2 columns\\.$", + ): + bf_df[["Id", "Age"]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 3 arguments but DataFrame has 4 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Age=bf_df["Age"].astype("Int64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1, 22.5 and alpha", + "I got 2, 23 and beta", + "I got 3, 23.5 and gamma", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) + + +def test_df_apply_axis_1_single_param_non_series(session): + bf_df = bigframes.dataframe.DataFrame( + { + "Id": [1, 2, 3], + } + ) + + expected_dtypes = (bigframes.dtypes.INT_DTYPE,) + + # Assert the dataframe dtypes + assert tuple(bf_df.dtypes) == expected_dtypes + + try: + + @session.remote_function([int], str, reuse=False) + def foo(x): + return f"I got {x}" + + assert getattr(foo, "is_row_processor") is False + assert getattr(foo, "input_dtypes") == expected_dtypes + + # Fails to apply on dataframe with incompatible number of columns + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 0 columns\\.$", + ): + bf_df[[]].apply(foo, axis=1) + with pytest.raises( + ValueError, + match="^Remote function takes 1 arguments but DataFrame has 2 columns\\.$", + ): + bf_df.assign(Country="lalaland").apply(foo, axis=1) + + # Fails to apply on dataframe with incompatible column datatypes + with pytest.raises( + ValueError, + match="^Remote function takes arguments of types .* but DataFrame dtypes are .*", + ): + bf_df.assign(Id=bf_df["Id"].astype("Float64")).apply(foo, axis=1) + + # Successfully applies to dataframe with matching number of columns + # and their datatypes + bf_result = bf_df.apply(foo, axis=1).to_pandas() + + # Since this scenario is not pandas-like, let's handcraft the + # expected result + expected_result = pandas.Series( + [ + "I got 1", + "I got 2", + "I got 3", + ] + ) + + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, foo + ) diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index c07a0afb44..8ecf9eb368 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -21,6 +21,7 @@ import pytest import bigframes +import bigframes.dtypes import bigframes.exceptions from bigframes.functions import remote_function as rf from tests.system.utils import assert_pandas_df_equal @@ -708,6 +709,8 @@ def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): # It should point to the named routine and yield the expected results. assert square.bigframes_remote_function == str(routine.reference) + assert square.input_dtypes == (bigframes.dtypes.INT_DTYPE,) + assert square.output_dtype == bigframes.dtypes.INT_DTYPE src = {"x": [-5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5]} @@ -776,10 +779,14 @@ def test_read_gbq_function_enforces_explicit_types( str(both_types_specified.reference), session=session, ) - rf.read_gbq_function( - str(only_return_type_specified.reference), - session=session, - ) + with pytest.warns( + bigframes.exceptions.UnknownDataTypeWarning, + match="missing input data types.*assume default data type", + ): + rf.read_gbq_function( + str(only_return_type_specified.reference), + session=session, + ) with pytest.raises(ValueError): rf.read_gbq_function( str(only_arg_type_specified.reference), @@ -919,36 +926,41 @@ def add_ints(row): scalars_df[columns].apply(add_ints, axis=1) -@pytest.mark.parametrize( - ("column"), - [ - pytest.param("date_col"), - pytest.param("datetime_col"), - pytest.param("geography_col"), - pytest.param("numeric_col"), - pytest.param("time_col"), - pytest.param("timestamp_col"), - ], -) -def test_df_apply_axis_1_unsupported_dtype(scalars_dfs, column): - scalars_df, scalars_pandas_df = scalars_dfs - - # It doesn't matter if it is a remote function or not, the dtype check - # is done even before the function type check with axis=1 - def echo(row): - return row[column] +@pytest.mark.flaky(retries=2, delay=120) +def test_df_apply_axis_1_unsupported_dtype(session, scalars_dfs, dataset_id_permanent): + columns_with_not_supported_dtypes = [ + "date_col", + "datetime_col", + "geography_col", + "numeric_col", + "time_col", + "timestamp_col", + ] - # pandas works - scalars_pandas_df[[column]].apply(echo, axis=1) + scalars_df, scalars_pandas_df = scalars_dfs - dtype = scalars_df[column].dtype + def echo_len(row): + return len(row) - with pytest.raises( - NotImplementedError, - match=re.escape( - f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" - ), - ), pytest.warns( - bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." - ): - scalars_df[[column]].apply(echo, axis=1) + echo_len_remote = session.remote_function( + bigframes.series.Series, + float, + dataset_id_permanent, + name=get_rf_name(echo_len, is_row_processor=True), + )(echo_len) + + for column in columns_with_not_supported_dtypes: + # pandas works + scalars_pandas_df[[column]].apply(echo_len, axis=1) + + dtype = scalars_df[column].dtype + + with pytest.raises( + NotImplementedError, + match=re.escape( + f"DataFrame has a column of dtype '{dtype}' which is not supported with axis=1. Supported dtypes are (" + ), + ), pytest.warns( + bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." + ): + scalars_df[[column]].apply(echo_len_remote, axis=1) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 7048d9c6dd..10565a2552 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4361,9 +4361,50 @@ def apply(self, func, *, axis=0, args=(), **kwargs): 1 19 dtype: Int64 + You could also apply a remote function which accepts multiple parameters + to every row of a DataFrame by using it with `axis=1` if the DataFrame + has matching number of columns and data types. Note: This feature is + currently in **preview**. + + >>> df = bpd.DataFrame({ + ... 'col1': [1, 2], + ... 'col2': [3, 4], + ... 'col3': [5, 5] + ... }) + >>> df + col1 col2 col3 + 0 1 3 5 + 1 2 4 5 + + [2 rows x 3 columns] + + >>> @bpd.remote_function(reuse=False) + ... def foo(x: int, y: int, z: int) -> float: + ... result = 1 + ... result += x + ... result += y/z + ... return result + + >>> df.apply(foo, axis=1) + 0 2.6 + 1 3.8 + dtype: Float64 + Args: func (function): - Function to apply to each column or row. + Function to apply to each column or row. To apply to each row + (i.e. when `axis=1` is specified) the function can be of one of + the two types: + + (1). It accepts a single input parameter of type `Series`, in + which case each row is delivered to the function as a pandas + Series. + + (2). It accept one or more parameters, in which case column values + are delivered to the function as separate arguments (mapping + to those parameters) for each row. For this to work the + `DataFrame` must have same number of columns and matching + data types. axis ({index (0), columns (1)}): Axis along which the function is applied. Specify 0 or 'index' to apply function to each column. Specify 1 or 'columns' to From 8753bdd1e44701e56eae914ebc0e91d9b1a6adf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Fri, 2 Aug 2024 12:37:25 -0500 Subject: [PATCH 06/10] feat: create a separate OrderingModePartialPreviewWarning for more fine-grained warning filters (#879) --- bigframes/exceptions.py | 4 ++++ bigframes/session/__init__.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 6c5b66bc47..b1af96c9c4 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -47,6 +47,10 @@ class NullIndexError(ValueError): """Object has no index.""" +class OrderingModePartialPreviewWarning(PreviewWarning): + """Ordering mode 'partial' is in preview.""" + + class OrderRequiredError(ValueError): """Operation requires total row ordering to be enabled.""" diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 233e6ef930..f449b52fbf 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -302,7 +302,7 @@ def __init__( if not self._strictly_ordered: warnings.warn( "Partial ordering mode is a preview feature and is subject to change.", - bigframes.exceptions.PreviewWarning, + bigframes.exceptions.OrderingModePartialPreviewWarning, ) # Sequential index needs total ordering to generate, so use null index with unstrict ordering. From 9606dac3303e4cb97dc679295db6576f644f438a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Fri, 2 Aug 2024 12:41:30 -0500 Subject: [PATCH 07/10] chore: move OrderingMode to enums module (#870) --- bigframes/_config/bigquery_options.py | 18 ++++++------------ bigframes/enums.py | 10 ++++++++++ 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 0506f1841e..34b9a3128f 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -16,7 +16,6 @@ from __future__ import annotations -from enum import Enum from typing import Literal, Optional import warnings @@ -25,14 +24,9 @@ import jellyfish import bigframes.constants +import bigframes.enums import bigframes.exceptions - -class OrderingMode(Enum): - STRICT = "strict" - PARTIAL = "partial" - - SESSION_STARTED_MESSAGE = ( "Cannot change '{attribute}' once a session has started. " "Call bigframes.pandas.close_session() first, if you are using the bigframes.pandas API." @@ -64,11 +58,11 @@ def _validate_location(value: Optional[str]): ) -def _validate_ordering_mode(value: str) -> OrderingMode: - if value.casefold() == OrderingMode.STRICT.value.casefold(): - return OrderingMode.STRICT - if value.casefold() == OrderingMode.PARTIAL.value.casefold(): - return OrderingMode.PARTIAL +def _validate_ordering_mode(value: str) -> bigframes.enums.OrderingMode: + if value.casefold() == bigframes.enums.OrderingMode.STRICT.value.casefold(): + return bigframes.enums.OrderingMode.STRICT + if value.casefold() == bigframes.enums.OrderingMode.PARTIAL.value.casefold(): + return bigframes.enums.OrderingMode.PARTIAL raise ValueError("Ordering mode must be one of 'strict' or 'partial'.") diff --git a/bigframes/enums.py b/bigframes/enums.py index 9501d3f13e..fd7b5545bb 100644 --- a/bigframes/enums.py +++ b/bigframes/enums.py @@ -20,6 +20,16 @@ import enum +class OrderingMode(enum.Enum): + """[Preview] Values used to determine the ordering mode. + + Default is 'strict'. + """ + + STRICT = "strict" + PARTIAL = "partial" + + class DefaultIndexKind(enum.Enum): """Sentinel values used to override default indexing behavior.""" From c415eb91eb71dea53d245ba2bce416062e3f02f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 5 Aug 2024 12:29:38 -0500 Subject: [PATCH 08/10] docs: create sample notebook using `ordering_mode="partial"` (#880) --- notebooks/dataframes/pypi.ipynb | 335 ++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 notebooks/dataframes/pypi.ipynb diff --git a/notebooks/dataframes/pypi.ipynb b/notebooks/dataframes/pypi.ipynb new file mode 100644 index 0000000000..3022dc7173 --- /dev/null +++ b/notebooks/dataframes/pypi.ipynb @@ -0,0 +1,335 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# Copyright 2024 Google LLC\n", + "#\n", + "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", + "# you may not use this file except in compliance with the License.\n", + "# You may obtain a copy of the License at\n", + "#\n", + "# https://www.apache.org/licenses/LICENSE-2.0\n", + "#\n", + "# Unless required by applicable law or agreed to in writing, software\n", + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", + "# See the License for the specific language governing permissions and\n", + "# limitations under the License." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Analyzing Python dependencies with BigQuery DataFrames\n", + "\n", + "In this notebook, you'll use the [PyPI public dataset](https://console.cloud.google.com/marketplace/product/gcp-public-data-pypi/pypi) and the [deps.dev public dataset](https://deps.dev/) to visualize Python package downloads for a package and its dependencies.\n", + "\n", + "> **âš  Important**\n", + ">\n", + "> You'll use features that are currently in [preview](https://cloud.google.com/blog/products/gcp/google-cloud-gets-simplified-product-launch-stages): `ordering_mode=\"partial\"` and \"NULL\" indexes. There may be breaking changes to this functionality in future versions of the BigQuery DataFrames package.\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "import bigframes.pandas as bpd\n", + "\n", + "# Preview feature warning:\n", + "# Use `ordering_mode=\"partial\"` for more efficient query generation, but\n", + "# some pandas-compatible methods may not be possible without a total ordering.\n", + "bpd.options.bigquery.ordering_mode = \"partial\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Filter out the relevant warnings for preview features used." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "import warnings\n", + "\n", + "import bigframes.exceptions\n", + "\n", + "warnings.simplefilter(\"ignore\", category=bigframes.exceptions.NullIndexPreviewWarning)\n", + "warnings.simplefilter(\"ignore\", category=bigframes.exceptions.OrderingModePartialPreviewWarning)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Counting downloads and tracking dependencies\n", + "\n", + "The [PyPI `file_downloads`](https://console.cloud.google.com/bigquery?ws=!1m5!1m4!4m3!1sbigquery-public-data!2spypi!3sfile_downloads) table contains a row for each time there is a download request for a package. The [deps.dev Dependencies](https://console.cloud.google.com/bigquery?ws=!1m5!1m4!4m3!1sbigquery-public-data!2sdeps_dev_v1!3sDependencies) table contains a row for each dependency of each package.\n", + "\n", + "When `ordering_mode = \"partial\"`, `read_gbq_table` creates a DataFrame representing the table, but the DataFrame has no native ordering or index." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "import bigframes.enums\n", + "\n", + "# Without ordering_mode = \"partial\" it is recommended that you set\n", + "# the \"filters\" parameter to limit the number of rows subsequent queries\n", + "# have to read.\n", + "pypi = bpd.read_gbq_table(\n", + " \"bigquery-public-data.pypi.file_downloads\",\n", + "\n", + " # Using ordering_mode = \"partial\" changes the default index to a \"NULL\"\n", + " # index, meaning no index is available for implicit joins.\n", + " #\n", + " # Setting this explicitly avoids a DefaultIndexWarning.\n", + " index_col=bigframes.enums.DefaultIndexKind.NULL,\n", + ")\n", + "deps = bpd.read_gbq_table(\n", + " \"bigquery-public-data.deps_dev_v1.Dependencies\",\n", + " index_col=bigframes.enums.DefaultIndexKind.NULL,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Limit to the most recent 30 days of data\n", + "\n", + "The PyPI and deps.dev tables are partitioned by date. Query only the most recent 30 days of data to reduce the number of bytes scanned.\n", + "\n", + "Just as with the default ordering mode, filters can be describe in a pandas-compatible way by passing a Boolean Series to the DataFrame's `__getitem__` accessor." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "import datetime\n", + "\n", + "last_30_days = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=30)\n", + "pypi = pypi[pypi[\"timestamp\"] > last_30_days]\n", + "deps = deps[(deps[\"SnapshotAt\"] > last_30_days) & (deps[\"System\"] == \"PYPI\")]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n", + "**âš  Warning**\n", + "\n", + "Without `ordering_mode = \"partial\"`, these filters do not change the number of bytes scanned. Instead, add column and row filters at \"read\" time. For example,\n", + "\n", + "```\n", + "import datetime\n", + "\n", + "last_30_days = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=30)\n", + "\n", + "# Without ordering_mode = \"partial\", one must limit the data at \"read\" time to reduce bytes scanned.\n", + "pypi = bpd.read_gbq_table(\n", + " \"bigquery-public-data.pypi.file_downloads\",\n", + " columns=[\"timestamp\", \"project\"],\n", + " filters=[(\"timestamp\", \">\", last_30_days)],\n", + ")\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Find dependencies for pandas\n", + "\n", + "Use assign to add columns to the DataFrame after a scalar operations, such as extracting a sub-field from a `STRUCT` column.\n", + "\n", + "Because the DataFrame has no index, this does not work if the new column belongs to a different table expression." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "deps = deps.assign(DependencyName=deps[\"Dependency\"].struct.field(\"Name\"))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Use an aggregation to identify the unique `DependencyName`s for the `pandas` package. Note: `drop_duplicates()` is not supported, as the order-based behavior such as `keep=\"first\"` is not applicable when using `ordering_mode = \"partial\"`.\n", + "\n", + "A DataFrame with no index still supports aggregation operations. Set `as_index=False` to keep the GROUP BY keys as regular columns, instead of turning them into an index." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pandas_deps = deps[deps[\"Name\"] == \"pandas\"].groupby([\"Name\", \"DependencyName\"], as_index=False).size()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Count downloads for pandas and its dependencies\n", + "\n", + "The previous step created `pandas_deps` with all the dependencies of `pandas` but not pandas itself.\n", + "\n", + "Combine two DataFrames with the same column names with the `bigframes.pandas.concat` function." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "pandas_and_deps = bpd.concat(\n", + " [\n", + " pandas_deps.drop(columns=[\"Name\", \"size\"]).rename(columns={\"DependencyName\": \"Name\"}),\n", + " bpd.DataFrame({\"Name\": [\"pandas\"]}),\n", + " ],\n", + "\n", + " # To join DataFrames that have a NULL index, set ignore_index = True.\n", + " ignore_index=True,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Since there is no index to implicitly join on, use the `merge` method to join two DataFrames by column name." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "pandas_pypi = pandas_and_deps.merge(pypi, how=\"inner\", left_on=\"Name\", right_on=\"project\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a time series to visualize by grouping by the date, extracted from the `timestamp` column." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Query job 5aa35b9c-459a-4b46-b70c-36e6418b61eb is DONE. 920.8 GB processed. Open Job" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# When BigQuery DataFrames aggregates over columns, those columns provide a\n", + "# unique key post-aggregation that is used for ordering. By aggregating over\n", + "# a time series, the line plots will render in the expexted order.\n", + "pandas_pypi = pandas_pypi.assign(date=pandas_pypi[\"timestamp\"].dt.date)\n", + "downloads_per_day = pandas_pypi.groupby([\"date\", \"project\"]).size()\n", + "\n", + "# Convert to a pandas DataFrame for further transformation and visualization.\n", + "pd_df = downloads_per_day.to_pandas()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Once you've downloaded the time series with the `to_pandas()` method, you can use typical pandas methods to visualize the data." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "pd_timeseries = pd_df.unstack()\n", + "pd_timeseries.plot.line(rot=45, ylabel=\"daily downloads\", ylim=(0, 2e7))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From ca26fe5f9edec519788c276a09eaff33ecd87434 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 5 Aug 2024 10:37:27 -0700 Subject: [PATCH 09/10] feat: Allow windowing in 'partial' ordering mode (#861) --- bigframes/core/__init__.py | 26 +++++++++-- bigframes/core/blocks.py | 4 ++ bigframes/core/compile/compiled.py | 2 +- bigframes/core/groupby/__init__.py | 38 ++++++++-------- bigframes/core/indexes/base.py | 8 ++-- bigframes/core/nodes.py | 42 +++++++++++++++++ bigframes/core/validations.py | 17 ++++++- bigframes/dataframe.py | 54 +++++++++++----------- bigframes/exceptions.py | 4 ++ bigframes/series.py | 52 ++++++++++----------- bigframes/session/__init__.py | 5 ++ tests/system/small/test_dataframe.py | 2 +- tests/system/small/test_unordered.py | 68 +++++++++++++++------------- 13 files changed, 205 insertions(+), 117 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index aa66129572..2e9b5fa994 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -194,8 +194,17 @@ def promote_offsets(self, col_id: str) -> ArrayValue: """ Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ - if self.node.order_ambiguous and not self.session._strictly_ordered: - raise ValueError("Generating offsets not supported in unordered mode") + if self.node.order_ambiguous and not (self.session._strictly_ordered): + if not self.session._allows_ambiguity: + raise ValueError( + "Generating offsets not supported in partial ordering mode" + ) + else: + warnings.warn( + "Window ordering may be ambiguous, this can cause unstable results.", + bigframes.exceptions.AmbiguousWindowWarning, + ) + return ArrayValue(nodes.PromoteOffsetsNode(child=self.node, col_id=col_id)) def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: @@ -347,9 +356,16 @@ def project_window_op( # TODO: Support non-deterministic windowing if window_spec.row_bounded or not op.order_independent: if self.node.order_ambiguous and not self.session._strictly_ordered: - raise ValueError( - "Order-dependent windowed ops not supported in unordered mode" - ) + if not self.session._allows_ambiguity: + raise ValueError( + "Generating offsets not supported in partial ordering mode" + ) + else: + warnings.warn( + "Window ordering may be ambiguous, this can cause unstable results.", + bigframes.exceptions.AmbiguousWindowWarning, + ) + return ArrayValue( nodes.WindowOpNode( child=self.node, diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 1b7b231403..65a89b4516 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -280,6 +280,10 @@ def index_name_to_col_id(self) -> typing.Mapping[Label, typing.Sequence[str]]: mapping[label] = (*mapping.get(label, ()), id) return mapping + @property + def explicitly_ordered(self) -> bool: + return self.expr.node.explicitly_ordered + def cols_matching_label(self, partial_label: Label) -> typing.Sequence[str]: """ Unlike label_to_col_id, this works with partial labels for multi-index. diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index c822dd331c..538789f9d7 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -263,7 +263,7 @@ def to_sql( ordered: bool = False, ) -> str: if offset_column or ordered: - raise ValueError("Cannot produce sorted sql in unordered mode") + raise ValueError("Cannot produce sorted sql in partial ordering mode") sql = ibis_bigquery.Backend().compile( self._to_ibis_expr( col_id_overrides=col_id_overrides, diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index 02bf201ca0..2b80d0389e 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -109,7 +109,7 @@ def __getitem__( dropna=self._dropna, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def head(self, n: int = 5) -> df.DataFrame: block = self._block if self._dropna: @@ -235,25 +235,25 @@ def count(self) -> df.DataFrame: def nunique(self) -> df.DataFrame: return self._aggregate_all(agg_ops.nunique_op) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: if not numeric_only: self._raise_on_non_numeric("cumsum") return self._apply_window_op(agg_ops.sum_op, numeric_only=True) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.min_op, numeric_only=numeric_only) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.max_op, numeric_only=numeric_only) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self, *args, **kwargs) -> df.DataFrame: return self._apply_window_op(agg_ops.product_op, numeric_only=True) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -262,7 +262,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -271,7 +271,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -287,7 +287,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: block, window_spec, self._selected_cols, drop_null_groups=self._dropna ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), @@ -532,7 +532,7 @@ def __init__( def _session(self) -> core.Session: return self._block.session - @validations.requires_strict_ordering() + @validations.requires_ordering() def head(self, n: int = 5) -> series.Series: block = self._block if self._dropna: @@ -650,31 +650,31 @@ def agg(self, func=None) -> typing.Union[df.DataFrame, series.Series]: aggregate = agg - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.sum_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.product_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.max_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self, *args, **kwargs) -> series.Series: return self._apply_window_op( agg_ops.min_op, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumcount(self, *args, **kwargs) -> series.Series: return ( self._apply_window_op( @@ -684,7 +684,7 @@ def cumcount(self, *args, **kwargs) -> series.Series: - 1 ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods=1) -> series.Series: """Shift index by desired number of periods.""" window = window_specs.rows( @@ -694,7 +694,7 @@ def shift(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods=1) -> series.Series: window = window_specs.rows( grouping_keys=tuple(self._by_col_ids), @@ -703,7 +703,7 @@ def diff(self, periods=1) -> series.Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window=window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> windows.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = window_specs.rows( @@ -723,7 +723,7 @@ def rolling(self, window: int, min_periods=None) -> windows.Window: is_series=True, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> windows.Window: window_spec = window_specs.cumulative_rows( grouping_keys=tuple(self._by_col_ids), diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index 8b039707c2..0376e37f96 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -184,7 +184,7 @@ def empty(self) -> bool: return self.shape[0] == 0 @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: """ Return a boolean if the values are equal or increasing. @@ -198,7 +198,7 @@ def is_monotonic_increasing(self) -> bool: ) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: """ Return a boolean if the values are equal or decreasing. @@ -348,7 +348,7 @@ def max(self) -> typing.Any: def min(self) -> typing.Any: return self._apply_aggregation(agg_ops.min_op) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -361,7 +361,7 @@ def argmax(self) -> int: return typing.cast(int, series.Series(block.select_column(row_nums)).iloc[0]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index a979e07972..30edc7740a 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -135,6 +135,14 @@ def order_ambiguous(self) -> bool: """ ... + @property + @abc.abstractmethod + def explicitly_ordered(self) -> bool: + """ + Whether row ordering is potentially ambiguous. For example, ReadTable (without a primary key) could be ordered in different ways. + """ + ... + @functools.cached_property def total_variables(self) -> int: return self.variables_introduced + sum( @@ -180,6 +188,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def schema(self) -> schemata.ArraySchema: return self.child.schema + @property + def explicitly_ordered(self) -> bool: + return self.child.explicitly_ordered + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -212,6 +224,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def order_ambiguous(self) -> bool: return True + @property + def explicitly_ordered(self) -> bool: + return False + def __hash__(self): return self._node_hash @@ -267,6 +283,10 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def order_ambiguous(self) -> bool: return any(child.order_ambiguous for child in self.children) + @property + def explicitly_ordered(self) -> bool: + return all(child.explicitly_ordered for child in self.children) + def __hash__(self): return self._node_hash @@ -317,6 +337,10 @@ def variables_introduced(self) -> int: def order_ambiguous(self) -> bool: return False + @property + def explicitly_ordered(self) -> bool: + return True + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -378,6 +402,10 @@ def relation_ops_created(self) -> int: def order_ambiguous(self) -> bool: return len(self.total_order_cols) == 0 + @property + def explicitly_ordered(self) -> bool: + return len(self.total_order_cols) > 0 + @functools.cached_property def variables_introduced(self) -> int: return len(self.schema.items) + 1 @@ -449,6 +477,12 @@ def hidden_columns(self) -> typing.Tuple[str, ...]: def order_ambiguous(self) -> bool: return not isinstance(self.ordering, orderings.TotalOrdering) + @property + def explicitly_ordered(self) -> bool: + return (self.ordering is not None) and len( + self.ordering.all_ordering_columns + ) > 0 + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -523,6 +557,10 @@ def relation_ops_created(self) -> int: # Doesnt directly create any relational operations return 0 + @property + def explicitly_ordered(self) -> bool: + return True + @dataclass(frozen=True) class ReversedNode(UnaryNode): @@ -636,6 +674,10 @@ def variables_introduced(self) -> int: def order_ambiguous(self) -> bool: return False + @property + def explicitly_ordered(self) -> bool: + return True + @dataclass(frozen=True) class WindowOpNode(UnaryNode): diff --git a/bigframes/core/validations.py b/bigframes/core/validations.py index c5761f4e09..9c03ddb930 100644 --- a/bigframes/core/validations.py +++ b/bigframes/core/validations.py @@ -24,6 +24,7 @@ if TYPE_CHECKING: from bigframes import Session + from bigframes.core.blocks import Block class HasSession(Protocol): @@ -31,8 +32,12 @@ class HasSession(Protocol): def _session(self) -> Session: ... + @property + def _block(self) -> Block: + ... + -def requires_strict_ordering(suggestion: Optional[str] = None): +def requires_ordering(suggestion: Optional[str] = None): def decorator(meth): @functools.wraps(meth) def guarded_meth(object: HasSession, *args, **kwargs): @@ -47,8 +52,16 @@ def guarded_meth(object: HasSession, *args, **kwargs): def enforce_ordered( object: HasSession, opname: str, suggestion: Optional[str] = None ) -> None: - if not object._session._strictly_ordered: + session = object._session + if session._strictly_ordered or not object._block.expr.node.order_ambiguous: + # No ambiguity for how to calculate ordering, so no error or warning + return None + if not session._allows_ambiguity: suggestion_substr = suggestion + " " if suggestion else "" raise bigframes.exceptions.OrderRequiredError( f"Op {opname} not supported when strict ordering is disabled. {suggestion_substr}{bigframes.constants.FEEDBACK_LINK}" ) + if not object._block.explicitly_ordered: + raise bigframes.exceptions.OrderRequiredError( + f"Op {opname} requires an ordering. Use .sort_values or .sort_index to provide an ordering. {bigframes.constants.FEEDBACK_LINK}" + ) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9d3b153d3a..649b097e92 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -282,12 +282,12 @@ def loc(self) -> indexers.LocDataFrameIndexer: return indexers.LocDataFrameIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iloc(self) -> indexers.ILocDataFrameIndexer: return indexers.ILocDataFrameIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iat(self) -> indexers.IatDataFrameIndexer: return indexers.IatDataFrameIndexer(self) @@ -344,12 +344,12 @@ def _has_index(self) -> bool: return len(self._block.index_columns) > 0 @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def T(self) -> DataFrame: return DataFrame(self._get_block().transpose()) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def transpose(self) -> DataFrame: return self.T @@ -1296,11 +1296,11 @@ def _compute_dry_run(self) -> bigquery.QueryJob: def copy(self) -> DataFrame: return DataFrame(self._block) - @validations.requires_strict_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) + @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[:n]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def tail(self, n: int = 5) -> DataFrame: return typing.cast(DataFrame, self.iloc[-n:]) @@ -1540,7 +1540,7 @@ def rename_axis( labels = [mapper] return DataFrame(self._block.with_index_labels(labels)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def equals(self, other: typing.Union[bigframes.series.Series, DataFrame]) -> bool: # Must be same object type, same column dtypes, and same label values if not isinstance(other, DataFrame): @@ -1938,7 +1938,7 @@ def _reindex_columns(self, columns): def reindex_like(self, other: DataFrame, *, validate: typing.Optional[bool] = None): return self.reindex(index=other.index, columns=other.columns, validate=validate) - @validations.requires_strict_ordering() + @validations.requires_ordering() @requires_index def interpolate(self, method: str = "linear") -> DataFrame: if method == "pad": @@ -1964,12 +1964,12 @@ def replace( lambda x: x.replace(to_replace=to_replace, value=value, regex=regex) ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> DataFrame: window = window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) @@ -2235,16 +2235,16 @@ def agg( aggregate.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.agg) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def idxmin(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmin(self._block)) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def idxmax(self) -> bigframes.series.Series: return bigframes.series.Series(block_ops.idxmax(self._block)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def melt( self, id_vars: typing.Optional[typing.Iterable[typing.Hashable]] = None, @@ -2349,7 +2349,7 @@ def _pivot( return DataFrame(pivot_block) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def pivot( self, *, @@ -2364,7 +2364,7 @@ def pivot( return self._pivot(columns=columns, index=index, values=values) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def pivot_table( self, values: typing.Optional[ @@ -2464,7 +2464,7 @@ def _stack_multi(self, level: LevelsType = -1): return DataFrame(block) @requires_index - @validations.requires_strict_ordering() + @validations.requires_ordering() def unstack(self, level: LevelsType = -1): if not utils.is_list_like(level): level = [level] @@ -2675,7 +2675,7 @@ def _perform_join_by_index( block, _ = self._block.join(other._block, how=how, block_identity_join=True) return DataFrame(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_def = window_spec.rows( @@ -2685,7 +2685,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_def, self._block.value_columns ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window = window_spec.cumulative_rows(min_periods=min_periods) return bigframes.core.window.Window( @@ -2788,7 +2788,7 @@ def notna(self) -> DataFrame: notnull = notna notnull.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.notna) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self): is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2801,7 +2801,7 @@ def cumsum(self): window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self) -> DataFrame: is_numeric_types = [ (dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE) @@ -2814,21 +2814,21 @@ def cumprod(self) -> DataFrame: window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self) -> DataFrame: return self._apply_window_op( agg_ops.min_op, window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self) -> DataFrame: return self._apply_window_op( agg_ops.max_op, window_spec.cumulative_rows(), ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2836,7 +2836,7 @@ def shift(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods: int = 1) -> DataFrame: window = window_spec.rows( preceding=periods if periods > 0 else None, @@ -2844,7 +2844,7 @@ def diff(self, periods: int = 1) -> DataFrame: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def pct_change(self, periods: int = 1) -> DataFrame: # Future versions of pandas will not perfrom ffill automatically df = self.ffill() @@ -2862,7 +2862,7 @@ def _apply_window_op( ) return DataFrame(block.select_columns(result_ids)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def sample( self, n: Optional[int] = None, @@ -3678,7 +3678,7 @@ def _optimize_query_complexity(self): _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") - @validations.requires_strict_ordering() + @validations.requires_ordering() def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: if not isinstance(other, (DataFrame, bf_series.Series)): raise NotImplementedError( diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index b1af96c9c4..00abb887b0 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -63,5 +63,9 @@ class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" +class AmbiguousWindowWarning(Warning): + """A query may produce nondeterministic results as the window may be ambiguously ordered.""" + + class UnknownDataTypeWarning(Warning): """Data type is unknown.""" diff --git a/bigframes/series.py b/bigframes/series.py index 9e33801834..d41553d0d7 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -93,12 +93,12 @@ def loc(self) -> bigframes.core.indexers.LocSeriesIndexer: return bigframes.core.indexers.LocSeriesIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iloc(self) -> bigframes.core.indexers.IlocSeriesIndexer: return bigframes.core.indexers.IlocSeriesIndexer(self) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def iat(self) -> bigframes.core.indexers.IatSeriesIndexer: return bigframes.core.indexers.IatSeriesIndexer(self) @@ -163,7 +163,7 @@ def struct(self) -> structs.StructAccessor: return structs.StructAccessor(self._block) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def T(self) -> Series: return self.transpose() @@ -175,7 +175,7 @@ def _info_axis(self) -> indexes.Index: def _session(self) -> bigframes.Session: return self._get_block().expr.session - @validations.requires_strict_ordering() + @validations.requires_ordering() def transpose(self) -> Series: return self @@ -271,7 +271,7 @@ def equals( return False return block_ops.equals(self._block, other._block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def reset_index( self, *, @@ -459,13 +459,13 @@ def case_when(self, caselist) -> Series: ignore_self=True, ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumsum(self) -> Series: return self._apply_window_op( agg_ops.sum_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def ffill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=limit, following=0) return self._apply_window_op(agg_ops.LastNonNullOp(), window) @@ -473,30 +473,30 @@ def ffill(self, *, limit: typing.Optional[int] = None) -> Series: pad = ffill pad.__doc__ = inspect.getdoc(vendored_pandas_series.Series.ffill) - @validations.requires_strict_ordering() + @validations.requires_ordering() def bfill(self, *, limit: typing.Optional[int] = None) -> Series: window = bigframes.core.window_spec.rows(preceding=0, following=limit) return self._apply_window_op(agg_ops.FirstNonNullOp(), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummax(self) -> Series: return self._apply_window_op( agg_ops.max_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cummin(self) -> Series: return self._apply_window_op( agg_ops.min_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def cumprod(self) -> Series: return self._apply_window_op( agg_ops.product_op, bigframes.core.window_spec.cumulative_rows() ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def shift(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -504,7 +504,7 @@ def shift(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.ShiftOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def diff(self, periods: int = 1) -> Series: window = bigframes.core.window_spec.rows( preceding=periods if periods > 0 else None, @@ -512,13 +512,13 @@ def diff(self, periods: int = 1) -> Series: ) return self._apply_window_op(agg_ops.DiffOp(periods), window) - @validations.requires_strict_ordering() + @validations.requires_ordering() def pct_change(self, periods: int = 1) -> Series: # Future versions of pandas will not perfrom ffill automatically series = self.ffill() return Series(block_ops.pct_change(series._block, periods=periods)) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rank( self, axis=0, @@ -610,7 +610,7 @@ def _mapping_replace(self, mapping: dict[typing.Hashable, typing.Hashable]): ) return Series(block.select_column(result)) - @validations.requires_strict_ordering() + @validations.requires_ordering() @requires_index def interpolate(self, method: str = "linear") -> Series: if method == "pad": @@ -633,11 +633,11 @@ def dropna( result = result.reset_index() return Series(result) - @validations.requires_strict_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) + @validations.requires_ordering(bigframes.constants.SUGGEST_PEEK_PREVIEW) def head(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[0:n]) - @validations.requires_strict_ordering() + @validations.requires_ordering() def tail(self, n: int = 5) -> Series: return typing.cast(Series, self.iloc[-n:]) @@ -1138,7 +1138,7 @@ def clip(self, lower, upper): ) return Series(block.select_column(result_id).with_column_labels([self.name])) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmax(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1151,7 +1151,7 @@ def argmax(self) -> int: scalars.Scalar, Series(block.select_column(row_nums)).iloc[0] ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def argmin(self) -> int: block, row_nums = self._block.promote_offsets() block = block.order_by( @@ -1217,14 +1217,14 @@ def idxmin(self) -> blocks.Label: return indexes.Index(block).to_pandas()[0] @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_increasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_increasing(self._value_column) ) @property - @validations.requires_strict_ordering() + @validations.requires_ordering() def is_monotonic_decreasing(self) -> bool: return typing.cast( bool, self._block.is_monotonic_decreasing(self._value_column) @@ -1332,7 +1332,7 @@ def sort_index(self, *, axis=0, ascending=True, na_position="last") -> Series: block = block.order_by(ordering) return Series(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. window_spec = bigframes.core.window_spec.rows( @@ -1342,7 +1342,7 @@ def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window self._block, window_spec, self._block.value_columns, is_series=True ) - @validations.requires_strict_ordering() + @validations.requires_ordering() def expanding(self, min_periods: int = 1) -> bigframes.core.window.Window: window_spec = bigframes.core.window_spec.cumulative_rows( min_periods=min_periods @@ -1615,7 +1615,7 @@ def drop_duplicates(self, *, keep: str = "first") -> Series: block = block_ops.drop_duplicates(self._block, (self._value_column,), keep) return Series(block) - @validations.requires_strict_ordering() + @validations.requires_ordering() def unique(self) -> Series: return self.drop_duplicates() @@ -1806,7 +1806,7 @@ def map( result_df = self_df.join(map_df, on="series") return result_df[self.name] - @validations.requires_strict_ordering() + @validations.requires_ordering() def sample( self, n: Optional[int] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f449b52fbf..dc1da488a1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -314,6 +314,7 @@ def __init__( self._compiler = bigframes.core.compile.SQLCompiler( strict=self._strictly_ordered ) + self._allow_ambiguity = not self._strictly_ordered self._remote_function_session = bigframes_rf._RemoteFunctionSession() @@ -378,6 +379,10 @@ def slot_millis_sum(self): """The sum of all slot time used by bigquery jobs in this session.""" return self._slot_millis_sum + @property + def _allows_ambiguity(self) -> bool: + return self._allow_ambiguity + def _add_bytes_processed(self, amount: int): """Increment bytes_processed_sum by amount.""" self._bytes_processed_sum += amount diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 3a7eff621f..d838251dca 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2273,7 +2273,7 @@ def test_series_binop_add_different_table( def test_join_same_table(scalars_dfs_maybe_ordered, how): bf_df, pd_df = scalars_dfs_maybe_ordered if not bf_df._session._strictly_ordered and how == "cross": - pytest.skip("Cross join not supported in unordered mode.") + pytest.skip("Cross join not supported in partial ordering mode.") bf_df_a = bf_df.set_index("int64_too")[["string_col", "int64_col"]] bf_df_a = bf_df_a.sort_index() diff --git a/tests/system/small/test_unordered.py b/tests/system/small/test_unordered.py index 7d7097ceb3..9f85ec99f9 100644 --- a/tests/system/small/test_unordered.py +++ b/tests/system/small/test_unordered.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import warnings + import pandas as pd import pyarrow as pa import pytest @@ -99,7 +101,6 @@ def test_unordered_mode_read_gbq(unordered_session): [ pytest.param( "first", - marks=pytest.mark.xfail(raises=bigframes.exceptions.OrderRequiredError), ), pytest.param( False, @@ -138,37 +139,6 @@ def test_unordered_merge(unordered_session): assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True) -@pytest.mark.parametrize( - ("function"), - [ - pytest.param( - lambda x: x.cumsum(), - id="cumsum", - ), - pytest.param( - lambda x: x.idxmin(), - id="idxmin", - ), - pytest.param( - lambda x: x.a.iloc[1::2], - id="series_iloc", - ), - pytest.param( - lambda x: x.head(3), - id="head", - ), - ], -) -def test_unordered_mode_blocks_windowing(unordered_session, function): - pd_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, dtype=pd.Int64Dtype()) - df = bpd.DataFrame(pd_df, session=unordered_session) - with pytest.raises( - bigframes.exceptions.OrderRequiredError, - match=r"Op.*not supported when strict ordering is disabled", - ): - function(df) - - def test_unordered_mode_cache_preserves_order(unordered_session): pd_df = pd.DataFrame( {"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() @@ -181,3 +151,37 @@ def test_unordered_mode_cache_preserves_order(unordered_session): # B is unique so unstrict order mode result here should be equivalent to strictly ordered assert_pandas_df_equal(bf_result, pd_result, ignore_order=False) + + +def test_unordered_mode_no_ordering_error(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with pytest.raises(bigframes.exceptions.OrderRequiredError): + df.merge(df, on="a").head(3) + + +def test_unordered_mode_ambiguity_warning(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with pytest.warns(bigframes.exceptions.AmbiguousWindowWarning): + df.merge(df, on="a").sort_values("b_x").head(3) + + +def test_unordered_mode_no_ambiguity_warning(unordered_session): + pd_df = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 1], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype() + ) + pd_df.index = pd_df.index.astype(pd.Int64Dtype()) + df = bpd.DataFrame(pd_df, session=unordered_session) + + with warnings.catch_warnings(): + warnings.simplefilter("error") + df.groupby("a").head(3) From 5317327f8bf7751688f3ad4cc0c96f719cf2b062 Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:43:08 -0700 Subject: [PATCH 10/10] chore(main): release 1.13.0 (#876) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 20 ++++++++++++++++++++ bigframes/version.py | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 354c356c7c..3209391f44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,26 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.13.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.12.0...v1.13.0) (2024-08-05) + + +### Features + +* `df.apply(axis=1)` to support remote function with mutiple params ([#851](https://github.com/googleapis/python-bigquery-dataframes/issues/851)) ([2158818](https://github.com/googleapis/python-bigquery-dataframes/commit/2158818e53e09e55c87ffd574e3ebc2e201285fb)) +* Allow windowing in 'partial' ordering mode ([#861](https://github.com/googleapis/python-bigquery-dataframes/issues/861)) ([ca26fe5](https://github.com/googleapis/python-bigquery-dataframes/commit/ca26fe5f9edec519788c276a09eaff33ecd87434)) +* Create a separate OrderingModePartialPreviewWarning for more fine-grained warning filters ([#879](https://github.com/googleapis/python-bigquery-dataframes/issues/879)) ([8753bdd](https://github.com/googleapis/python-bigquery-dataframes/commit/8753bdd1e44701e56eae914ebc0e91d9b1a6adf1)) + + +### Bug Fixes + +* Fix issue with invalid sql generated by ml distance functions ([#865](https://github.com/googleapis/python-bigquery-dataframes/issues/865)) ([9959fc8](https://github.com/googleapis/python-bigquery-dataframes/commit/9959fc8fcba93441fdd3d9c17e8fdbe6e6a7b504)) + + +### Documentation + +* Create sample notebook using `ordering_mode="partial"` ([#880](https://github.com/googleapis/python-bigquery-dataframes/issues/880)) ([c415eb9](https://github.com/googleapis/python-bigquery-dataframes/commit/c415eb91eb71dea53d245ba2bce416062e3f02f8)) +* Update streaming notebook ([#875](https://github.com/googleapis/python-bigquery-dataframes/issues/875)) ([e9b0557](https://github.com/googleapis/python-bigquery-dataframes/commit/e9b05571123cf13079772856317ca3cd3d564c5a)) + ## [1.12.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.11.1...v1.12.0) (2024-07-31) diff --git a/bigframes/version.py b/bigframes/version.py index 29cf036f42..b474f021d4 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.12.0" +__version__ = "1.13.0"