From 518d315487f351c227070c0127382d11381c5e88 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 22 Apr 2024 17:15:33 -0700 Subject: [PATCH 01/18] feat: Series binary ops compatible with more types (#618) --- bigframes/core/__init__.py | 12 -- bigframes/core/compile/scalar_op_compiler.py | 2 +- bigframes/core/convert.py | 29 ++++- bigframes/dataframe.py | 7 +- bigframes/operations/base.py | 128 ++++++++++++------- bigframes/series.py | 9 -- tests/system/small/test_series.py | 112 ++++++++++++---- 7 files changed, 200 insertions(+), 99 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 9e6b86fc30..04291edbb1 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -117,18 +117,6 @@ def _compiled_schema(self) -> schemata.ArraySchema: ) return schemata.ArraySchema(items) - def validate_schema(self): - tree_derived = self.node.schema - ibis_derived = self._compiled_schema - if tree_derived.names != ibis_derived.names: - raise ValueError( - f"Unexpected names internal {tree_derived.names} vs compiled {ibis_derived.names}" - ) - if tree_derived.dtypes != ibis_derived.dtypes: - raise ValueError( - f"Unexpected types internal {tree_derived.dtypes} vs compiled {ibis_derived.dtypes}" - ) - def _try_evaluate_local(self): """Use only for unit testing paths - not fully featured. Will throw exception if fails.""" import ibis diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index 072d974b39..a65ff6fe0c 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1366,7 +1366,7 @@ def clip_op( @scalar_op_compiler.register_nary_op(ops.case_when_op) -def switch_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: +def case_when_op(*cases_and_outputs: ibis_types.Value) -> ibis_types.Value: # ibis can handle most type coercions, but we need to force bool -> int # TODO: dispatch coercion depending on bigframes dtype schema result_values = cases_and_outputs[1::2] diff --git a/bigframes/core/convert.py b/bigframes/core/convert.py index 1ef329b0c7..7bfca82779 100644 --- a/bigframes/core/convert.py +++ b/bigframes/core/convert.py @@ -21,7 +21,22 @@ import bigframes.series as series -def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series: +def is_series_convertible(obj) -> bool: + if isinstance(obj, series.Series): + return True + if isinstance(obj, pd.Series): + return True + if isinstance(obj, index.Index): + return True + if isinstance(obj, pd.Index): + return True + if pd.api.types.is_list_like(obj): + return True + else: + return False + + +def to_bf_series(obj, default_index: Optional[index.Index], session) -> series.Series: """ Convert a an object to a bigframes series @@ -37,13 +52,15 @@ def to_bf_series(obj, default_index: Optional[index.Index]) -> series.Series: if isinstance(obj, series.Series): return obj if isinstance(obj, pd.Series): - return series.Series(obj) + return series.Series(obj, session=session) if isinstance(obj, index.Index): - return series.Series(obj, default_index) + return series.Series(obj, default_index, session=session) if isinstance(obj, pd.Index): - return series.Series(obj, default_index) + return series.Series(obj, default_index, session=session) + if pd.api.types.is_dict_like(obj): + return series.Series(obj, session=session) if pd.api.types.is_list_like(obj): - return series.Series(obj, default_index) + return series.Series(obj, default_index, session=session) else: raise TypeError(f"Cannot interpret {obj} as series.") @@ -69,6 +86,8 @@ def to_pd_series(obj, default_index: pd.Index) -> pd.Series: return pd.Series(obj.to_pandas(), default_index) if isinstance(obj, pd.Index): return pd.Series(obj, default_index) + if pd.api.types.is_dict_like(obj): + return pd.Series(obj) if pd.api.types.is_list_like(obj): return pd.Series(obj, default_index) else: diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index ff8404761c..4f9ee44f09 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -105,6 +105,9 @@ def __init__( raise ValueError( f"DataFrame constructor only supports copy=True. {constants.FEEDBACK_LINK}" ) + # just ignore object dtype if provided + if dtype in {numpy.dtypes.ObjectDType, "object"}: + dtype = None # Check to see if constructing from BigQuery-backed objects before # falling back to pandas constructor @@ -668,7 +671,9 @@ def _apply_binop( DataFrame(other), op, how=how, reverse=reverse ) elif utils.get_axis_number(axis) == 0: - bf_series = bigframes.core.convert.to_bf_series(other, self.index) + bf_series = bigframes.core.convert.to_bf_series( + other, self.index, self._session + ) return self._apply_series_binop_axis_0(bf_series, op, how, reverse) elif utils.get_axis_number(axis) == 1: pd_series = bigframes.core.convert.to_pd_series(other, self.columns) diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 9bfa0500b5..b003ce59cc 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -17,10 +17,12 @@ import typing import bigframes_vendored.pandas.pandas._typing as vendored_pandas_typing +import numpy import pandas as pd import bigframes.constants as constants import bigframes.core.blocks as blocks +import bigframes.core.convert import bigframes.core.expression as ex import bigframes.core.indexes as indexes import bigframes.core.scalar as scalars @@ -44,7 +46,19 @@ def __init__( *, session: typing.Optional[bigframes.session.Session] = None, ): - block = None + import bigframes.pandas + + # just ignore object dtype if provided + if dtype in {numpy.dtypes.ObjectDType, "object"}: + dtype = None + + read_pandas_func = ( + session.read_pandas + if (session is not None) + else (lambda x: bigframes.pandas.read_pandas(x)) + ) + + block: typing.Optional[blocks.Block] = None if copy is not None and not copy: raise ValueError( f"Series constructor only supports copy=True. {constants.FEEDBACK_LINK}" @@ -55,58 +69,75 @@ def __init__( assert index is None block = data - elif isinstance(data, SeriesMethods): - block = data._block + # interpret these cases as both index and data + elif ( + isinstance(data, SeriesMethods) + or isinstance(data, pd.Series) + or pd.api.types.is_dict_like(data) + ): + if isinstance(data, pd.Series): + data = read_pandas_func(data) + elif pd.api.types.is_dict_like(data): + data = read_pandas_func(pd.Series(data, dtype=dtype)) # type: ignore + dtype = None + data_block = data._block if index is not None: # reindex - bf_index = indexes.Index(index) + bf_index = indexes.Index(index, session=session) idx_block = bf_index._block idx_cols = idx_block.value_columns - block_idx, _ = idx_block.join(block, how="left") - block = block_idx.with_index_labels(bf_index.names) - - elif isinstance(data, indexes.Index): + block_idx, _ = idx_block.join(data_block, how="left") + data_block = block_idx.with_index_labels(bf_index.names) + block = data_block + + # list-like data that will get default index + elif isinstance(data, indexes.Index) or pd.api.types.is_list_like(data): + data = indexes.Index(data, dtype=dtype, session=session) + dtype = ( + None # set to none as it has already been applied, avoid re-cast later + ) if data.nlevels != 1: raise NotImplementedError("Cannot interpret multi-index as Series.") # Reset index to promote index columns to value columns, set default index - block = data._block.reset_index(drop=False) + data_block = data._block.reset_index(drop=False).with_column_labels( + data.names + ) if index is not None: # Align by offset - bf_index = indexes.Index(index) - idx_block = bf_index._block.reset_index(drop=False) + bf_index = indexes.Index(index, session=session) + idx_block = bf_index._block.reset_index( + drop=False + ) # reset to align by offsets, and then reset back idx_cols = idx_block.value_columns - block, (l_mapping, _) = idx_block.join(block, how="left") - block = block.set_index([l_mapping[col] for col in idx_cols]) - block = block.with_index_labels(bf_index.names) - - if block: - if name: - if not isinstance(name, typing.Hashable): - raise ValueError( - f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" - ) - block = block.with_column_labels([name]) - if dtype: - block = block.multi_apply_unary_op( - block.value_columns, ops.AsTypeOp(to_type=dtype) - ) - else: - import bigframes.pandas + data_block, (l_mapping, _) = idx_block.join(data_block, how="left") + data_block = data_block.set_index([l_mapping[col] for col in idx_cols]) + data_block = data_block.with_index_labels(bf_index.names) + block = data_block - pd_series = pd.Series( - data=data, index=index, dtype=dtype, name=name # type:ignore - ) - pd_dataframe = pd_series.to_frame() - if pd_series.name is None: - # to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename - pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1) - if session: - block = session.read_pandas(pd_dataframe)._get_block() + else: # Scalar case + if index is not None: + bf_index = indexes.Index(index, session=session) else: - # Uses default global session - block = bigframes.pandas.read_pandas(pd_dataframe)._get_block() - if pd_series.name is None: - block = block.with_column_labels([None]) + bf_index = indexes.Index( + [] if (data is None) else [0], + session=session, + dtype=bigframes.dtypes.INT_DTYPE, + ) + block, _ = bf_index._block.create_constant(data, dtype) + dtype = None + block = block.with_column_labels([name]) + + assert block is not None + if name: + if not isinstance(name, typing.Hashable): + raise ValueError( + f"BigQuery DataFrames only supports hashable series names. {constants.FEEDBACK_LINK}" + ) + block = block.with_column_labels([name]) + if dtype: + block = block.multi_apply_unary_op( + block.value_columns, ops.AsTypeOp(to_type=dtype) + ) self._block: blocks.Block = block @property @@ -145,17 +176,16 @@ def _apply_binary_op( reverse: bool = False, ) -> series.Series: """Applies a binary operator to the series and other.""" - if isinstance(other, pd.Series): - # TODO: Convert to BigQuery DataFrames series - raise NotImplementedError( - f"Pandas series not supported as operand. {constants.FEEDBACK_LINK}" + if bigframes.core.convert.is_series_convertible(other): + self_index = indexes.Index(self._block) + other_series = bigframes.core.convert.to_bf_series( + other, self_index, self._block.session ) - if isinstance(other, series.Series): - (self_col, other_col, block) = self._align(other, how=alignment) + (self_col, other_col, block) = self._align(other_series, how=alignment) name = self._name if ( - isinstance(other, series.Series) + hasattr(other, "name") and other.name != self._name and alignment == "outer" ): @@ -166,7 +196,7 @@ def _apply_binary_op( block, result_id = block.project_expr(expr, name) return series.Series(block.select_column(result_id)) - else: + else: # Scalar binop name = self._name expr = op.as_expr( ex.const(other) if reverse else self._value_column, diff --git a/bigframes/series.py b/bigframes/series.py index 47acfd0afb..5184d4bf1d 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -20,7 +20,6 @@ import inspect import itertools import numbers -import os import textwrap import typing from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union @@ -73,11 +72,6 @@ def __init__(self, *args, **kwargs): self._query_job: Optional[bigquery.QueryJob] = None super().__init__(*args, **kwargs) - # Runs strict validations to ensure internal type predictions and ibis are completely in sync - # Do not execute these validations outside of testing suite. - if "PYTEST_CURRENT_TEST" in os.environ: - self._block.expr.validate_schema() - @property def dt(self) -> dt.DatetimeMethods: return dt.DatetimeMethods(self._block) @@ -812,9 +806,6 @@ def combine_first(self, other: Series) -> Series: return result def update(self, other: Union[Series, Sequence, Mapping]) -> None: - import bigframes.core.convert - - other = bigframes.core.convert.to_bf_series(other, default_index=None) result = self._apply_binary_op( other, ops.coalesce_op, reverse=True, alignment="left" ) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index 9cb615fdcb..38aed19f05 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -126,6 +126,75 @@ def test_series_construct_from_list(): pd.testing.assert_series_equal(bf_result, pd_result) +def test_series_construct_reindex(): + bf_result = series.Series( + series.Series({1: 10, 2: 30, 3: 30}), index=[3, 2], dtype="Int64" + ).to_pandas() + pd_result = pd.Series(pd.Series({1: 10, 2: 30, 3: 30}), index=[3, 2], dtype="Int64") + + # BigQuery DataFrame default indices use nullable Int64 always + pd_result.index = pd_result.index.astype("Int64") + + pd.testing.assert_series_equal(bf_result, pd_result) + + +def test_series_construct_from_list_w_index(): + bf_result = series.Series( + [1, 1, 2, 3, 5, 8, 13], index=[10, 20, 30, 40, 50, 60, 70], dtype="Int64" + ).to_pandas() + pd_result = pd.Series( + [1, 1, 2, 3, 5, 8, 13], index=[10, 20, 30, 40, 50, 60, 70], dtype="Int64" + ) + + # BigQuery DataFrame default indices use nullable Int64 always + pd_result.index = pd_result.index.astype("Int64") + + pd.testing.assert_series_equal(bf_result, pd_result) + + +def test_series_construct_empty(session: bigframes.Session): + bf_series: series.Series = series.Series(session=session) + pd_series: pd.Series = pd.Series() + + bf_result = bf_series.empty + pd_result = pd_series.empty + + assert pd_result + assert bf_result == pd_result + + +def test_series_construct_scalar_no_index(): + bf_result = series.Series("hello world", dtype="string[pyarrow]").to_pandas() + pd_result = pd.Series("hello world", dtype="string[pyarrow]") + + # BigQuery DataFrame default indices use nullable Int64 always + pd_result.index = pd_result.index.astype("Int64") + + pd.testing.assert_series_equal(bf_result, pd_result) + + +def test_series_construct_scalar_w_index(): + bf_result = series.Series( + "hello world", dtype="string[pyarrow]", index=[0, 2, 1] + ).to_pandas() + pd_result = pd.Series("hello world", dtype="string[pyarrow]", index=[0, 2, 1]) + + # BigQuery DataFrame default indices use nullable Int64 always + pd_result.index = pd_result.index.astype("Int64") + + pd.testing.assert_series_equal(bf_result, pd_result) + + +def test_series_construct_nan(): + bf_result = series.Series(numpy.nan).to_pandas() + pd_result = pd.Series(numpy.nan) + + pd_result.index = pd_result.index.astype("Int64") + pd_result = pd_result.astype("Float64") + + pd.testing.assert_series_equal(bf_result, pd_result) + + def test_series_construct_from_list_escaped_strings(): """Check that special characters are supported.""" strings = [ @@ -949,17 +1018,6 @@ def test_reset_index_no_drop(scalars_df_index, scalars_pandas_df_index, name): pd.testing.assert_frame_equal(bf_result.to_pandas(), pd_result) -def test_series_add_pandas_series_not_implemented(scalars_dfs): - scalars_df, _ = scalars_dfs - with pytest.raises(NotImplementedError): - ( - scalars_df["float64_col"] - + pd.Series( - [1, 1, 1, 1], - ) - ).to_pandas() - - def test_copy(scalars_df_index, scalars_pandas_df_index): col_name = "float64_col" # Expect mutation on original not to effect_copy @@ -1269,6 +1327,27 @@ def test_binop_right_filtered(scalars_dfs): ) +@pytest.mark.parametrize( + ("other",), + [ + ([-1.4, 2.3, None],), + (pd.Index([-1.4, 2.3, None]),), + (pd.Series([-1.4, 2.3, None], index=[44, 2, 1]),), + ], +) +@skip_legacy_pandas +def test_series_binop_w_other_types(scalars_dfs, other): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = (scalars_df["int64_col"].head(3) + other).to_pandas() + pd_result = scalars_pandas_df["int64_col"].head(3) + other + + assert_series_equal( + bf_result, + pd_result, + ) + + @skip_legacy_pandas def test_series_combine_first(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs @@ -1771,17 +1850,6 @@ def test_empty_true_row_filter(scalars_dfs): assert pd_result == bf_result -def test_empty_true_memtable(session: bigframes.Session): - bf_series: series.Series = series.Series(session=session) - pd_series: pd.Series = pd.Series() - - bf_result = bf_series.empty - pd_result = pd_series.empty - - assert pd_result - assert bf_result == pd_result - - def test_series_names(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs From 054075d448f7de1b3bc1a4631b4e2340643de4ef Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 22 Apr 2024 18:20:09 -0700 Subject: [PATCH 02/18] feat: Add transpose support for small homogeneously typed DataFrames. (#621) --- bigframes/core/blocks.py | 51 ++++++++++++ bigframes/dataframe.py | 7 ++ tests/system/small/test_dataframe.py | 23 ++++++ .../bigframes_vendored/pandas/core/frame.py | 82 +++++++++++++++++++ 4 files changed, 163 insertions(+) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0f9cacd83d..2a888125f8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -35,6 +35,7 @@ import pyarrow as pa import bigframes._config.sampling_options as sampling_options +import bigframes.constants import bigframes.constants as constants import bigframes.core as core import bigframes.core.expression as ex @@ -1542,6 +1543,10 @@ def melt( var_names=typing.Sequence[typing.Hashable], value_name: typing.Hashable = "value", ): + """ + Unpivot columns to produce longer, narrower dataframe. + Arguments correspond to pandas.melt arguments. + """ # TODO: Implement col_level and ignore_index unpivot_col_id = guid.generate_guid() var_col_ids = tuple([guid.generate_guid() for _ in var_names]) @@ -1570,6 +1575,52 @@ def melt( index_columns=[index_id], ) + def transpose(self) -> Block: + """Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows""" + original_col_index = self.column_labels + original_row_index = self.index.to_pandas() + original_row_count = len(original_row_index) + if original_row_count > bigframes.constants.MAX_COLUMNS: + raise NotImplementedError( + f"Object has {original_row_count} rows and is too large to transpose." + ) + + # Add row numbers to both axes to disambiguate, clean them up later + block = self + numbered_block = block.with_column_labels( + utils.combine_indices( + block.column_labels, pd.Index(range(len(block.column_labels))) + ) + ) + numbered_block, offsets = numbered_block.promote_offsets() + + stacked_block = numbered_block.melt( + id_vars=(offsets,), + var_names=( + *[name for name in original_col_index.names], + "col_offset", + ), + value_vars=block.value_columns, + ) + col_labels = stacked_block.value_columns[-2 - original_col_index.nlevels : -2] + col_offset = stacked_block.value_columns[-2] # disambiguator we created earlier + cell_values = stacked_block.value_columns[-1] + # Groupby source column + stacked_block = stacked_block.set_index( + [*col_labels, col_offset] + ) # col index is now row index + result = stacked_block.pivot( + columns=[offsets], + values=[cell_values], + columns_unique_values=tuple(range(original_row_count)), + ) + # Drop the offsets from both axes before returning + return ( + result.with_column_labels(original_row_index) + .order_by([ordering.ascending_over(result.index_columns[-1])]) + .drop_levels([result.index_columns[-1]]) + ) + def _create_stack_column( self, col_label: typing.Tuple, stack_labels: typing.Sequence[typing.Tuple] ): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 4f9ee44f09..a55b7f569b 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -314,6 +314,13 @@ def bqclient(self) -> bigframes.Session: def _session(self) -> bigframes.Session: return self._get_block().expr.session + @property + def T(self) -> DataFrame: + return DataFrame(self._get_block().transpose()) + + def transpose(self) -> DataFrame: + return self.T + def __len__(self): rows, _ = self.shape return rows diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 4c598a682d..f41a21add0 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2465,6 +2465,29 @@ def test_df_describe(scalars_dfs): ).all() +def test_df_transpose(): + # Include some floats to ensure type coercion + values = [[0, 3.5, True], [1, 4.5, False], [2, 6.5, None]] + # Test complex case of both axes being multi-indices with non-unique elements + columns = pd.Index(["A", "B", "A"], dtype=pd.StringDtype(storage="pyarrow")) + columns_multi = pd.MultiIndex.from_arrays([columns, columns], names=["c1", "c2"]) + index = pd.Index(["b", "a", "a"], dtype=pd.StringDtype(storage="pyarrow")) + rows_multi = pd.MultiIndex.from_arrays([index, index], names=["r1", "r2"]) + + pd_df = pandas.DataFrame(values, index=rows_multi, columns=columns_multi) + bf_df = dataframe.DataFrame(values, index=rows_multi, columns=columns_multi) + + pd_result = pd_df.T + bf_result = bf_df.T.to_pandas() + + pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False) + + +def test_df_transpose_error(): + with pytest.raises(TypeError, match="Cannot coerce.*to a common type."): + dataframe.DataFrame([[1, "hello"], [2, "world"]]).transpose() + + @pytest.mark.parametrize( ("ordered"), [ diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 0515f690e3..1669a291c9 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -93,6 +93,88 @@ def values(self) -> np.ndarray: """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + @property + def T(self) -> DataFrame: + """ + The transpose of the DataFrame. + + All columns must be the same dtype (numerics can be coerced to a common supertype). + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) + >>> df + col1 col2 + 0 1 3 + 1 2 4 + + [2 rows x 2 columns] + + >>> df.T + 0 1 + col1 1 2 + col2 3 4 + + [2 rows x 2 columns] + + Returns: + DataFrame: The transposed DataFrame. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + + def transpose(self) -> DataFrame: + """ + Transpose index and columns. + + Reflect the DataFrame over its main diagonal by writing rows as columns + and vice-versa. The property :attr:`.T` is an accessor to the method + :meth:`transpose`. + + All columns must be the same dtype (numerics can be coerced to a common supertype). + + **Examples:** + + **Square DataFrame with homogeneous dtype** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> d1 = {'col1': [1, 2], 'col2': [3, 4]} + >>> df1 = bpd.DataFrame(data=d1) + >>> df1 + col1 col2 + 0 1 3 + 1 2 4 + + [2 rows x 2 columns] + + >>> df1_transposed = df1.T # or df1.transpose() + >>> df1_transposed + 0 1 + col1 1 2 + col2 3 4 + + [2 rows x 2 columns] + + When the dtype is homogeneous in the original DataFrame, we get a + transposed DataFrame with the same dtype: + + >>> df1.dtypes + col1 Int64 + col2 Int64 + dtype: object + >>> df1_transposed.dtypes + 0 Int64 + 1 Int64 + dtype: object + + Returns: + DataFrame: The transposed DataFrame. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def info( self, verbose: bool | None = None, From 4724a1a456076d003613d2e964a8dd2d80a09ad9 Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Tue, 23 Apr 2024 09:45:12 -0700 Subject: [PATCH 03/18] fix: llm fine tuning tests (#627) --- tests/system/load/test_llm.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index 62ef7d5c72..d56f6100c1 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -45,9 +45,12 @@ def llm_remote_text_pandas_df(): ) -def test_llm_palm_configure_fit( - llm_fine_tune_df_default_index, llm_remote_text_pandas_df -): +@pytest.fixture(scope="session") +def llm_remote_text_df(session, llm_remote_text_pandas_df): + return session.read_pandas(llm_remote_text_pandas_df) + + +def test_llm_palm_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_df): model = bigframes.ml.llm.PaLM2TextGenerator( model_name="text-bison", max_iterations=1 ) @@ -59,7 +62,7 @@ def test_llm_palm_configure_fit( assert model is not None - df = model.predict(llm_remote_text_pandas_df).to_pandas() + df = model.predict(llm_remote_text_df["prompt"]).to_pandas() assert df.shape == (3, 4) assert "ml_generate_text_llm_result" in df.columns series = df["ml_generate_text_llm_result"] From 2b84c4f173e956ba2c7fcc0ad92785ae95161d8e Mon Sep 17 00:00:00 2001 From: Salem Jorden <115185670+SalemJorden@users.noreply.github.com> Date: Tue, 23 Apr 2024 12:07:55 -0500 Subject: [PATCH 04/18] docs: add the first sample for the Single time-series forecasting from Google Analytics data tutorial (#623) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BigQuery DataFrames sample for [Step two (optional): Visualize the time series you want to forecast](https://cloud.google.com/bigquery/docs/arima-single-time-series-forecasting-tutorial#step_two_optional_visualize_the_time_series_you_want_to_forecast). Co-authored-by: Salem Boyland Co-authored-by: Tim Sweña (Swast) --- ...ingle_timeseries_forecasting_model_test.py | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 samples/snippets/create_single_timeseries_forecasting_model_test.py diff --git a/samples/snippets/create_single_timeseries_forecasting_model_test.py b/samples/snippets/create_single_timeseries_forecasting_model_test.py new file mode 100644 index 0000000000..5750933713 --- /dev/null +++ b/samples/snippets/create_single_timeseries_forecasting_model_test.py @@ -0,0 +1,40 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (t +# you may not use this file except in compliance wi +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in +# distributed under the License is distributed on a +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eit +# See the License for the specific language governi +# limitations under the License. + + +def test_create_single_timeseries(): + + # [START bigquery_dataframes_single_timeseries_forecasting_model_tutorial] + import bigframes.pandas as bpd + + # Start by loading the historical data from BigQuerythat you want to analyze and forecast. + # This clause indicates that you are querying the ga_sessions_* tables in the google_analytics_sample dataset. + # Read and visualize the time series you want to forecast. + df = bpd.read_gbq("bigquery-public-data.google_analytics_sample.ga_sessions_*") + parsed_date = bpd.to_datetime(df.date, format="%Y%m%d", utc=True) + visits = df["totals"].struct.field("visits") + total_visits = visits.groupby(parsed_date).sum() + + # Expected output: total_visits.head() + # date + # 2016-08-01 00:00:00+00:00 1711 + # 2016-08-02 00:00:00+00:00 2140 + # 2016-08-03 00:00:00+00:00 2890 + # 2016-08-04 00:00:00+00:00 3161 + # 2016-08-05 00:00:00+00:00 2702 + # Name: visits, dtype: Int64 + + total_visits.plot.line() + + # [END bigquery_dataframes_single_timeseries_forecasting_model_tutorial] From 8d2a51c4079844daba20f414b6c0c0ca030ba1f9 Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Wed, 24 Apr 2024 07:45:46 -0700 Subject: [PATCH 05/18] docs: add supported pandas apis on the main page (#628) --- README.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/README.rst b/README.rst index 64d1e4e72c..70041c7c8e 100644 --- a/README.rst +++ b/README.rst @@ -25,6 +25,7 @@ Documentation * `BigQuery DataFrames source code (GitHub) `_ * `BigQuery DataFrames sample notebooks `_ * `BigQuery DataFrames API reference `_ +* `BigQuery DataFrames supported pandas APIs `_ Getting started with BigQuery DataFrames From 96c150a9de9a854872d47a2c4c545c77b79096e0 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 25 Apr 2024 02:12:43 +0000 Subject: [PATCH 06/18] test: test most relevant dtype for aggregates (#595) * fix: keep most relevant dtype for aggregates * add aggregate tests for bool result * refactor and reuse dtypes.lcd_dtype * check_dtype=False --- bigframes/dtypes.py | 16 ++-- tests/system/small/test_dataframe.py | 126 +++++++++++++++++++++++---- 2 files changed, 119 insertions(+), 23 deletions(-) diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index 3b2092bf85..d2dc210e0d 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -658,10 +658,14 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]: return None -def lcd_type(dtype1: Dtype, dtype2: Dtype) -> Dtype: - """Get the supertype of the two types.""" - if dtype1 == dtype2: - return dtype1 +def lcd_type(*dtypes: Dtype) -> Dtype: + if len(dtypes) < 1: + raise ValueError("at least one dypes should be provided") + if len(dtypes) == 1: + return dtypes[0] + unique_dtypes = set(dtypes) + if len(unique_dtypes) == 1: + return unique_dtypes.pop() # Implicit conversion currently only supported for numeric types hierarchy: list[Dtype] = [ pd.BooleanDtype(), @@ -670,9 +674,9 @@ def lcd_type(dtype1: Dtype, dtype2: Dtype) -> Dtype: pd.ArrowDtype(pa.decimal256(76, 38)), pd.Float64Dtype(), ] - if (dtype1 not in hierarchy) or (dtype2 not in hierarchy): + if any([dtype not in hierarchy for dtype in dtypes]): return None - lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2)) + lcd_index = max([hierarchy.index(dtype) for dtype in dtypes]) return hierarchy[lcd_index] diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index f41a21add0..3230ad2a89 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2390,12 +2390,27 @@ def test_dataframe_pct_change(scalars_df_index, scalars_pandas_df_index, periods def test_dataframe_agg_single_string(scalars_dfs): numeric_cols = ["int64_col", "int64_too", "float64_col"] scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() pd_result = scalars_pandas_df[numeric_cols].agg("sum") - # Pandas may produce narrower numeric types, but bigframes always produces Float64 - pd_result = pd_result.astype("Float64") - pd.testing.assert_series_equal(pd_result, bf_result, check_index_type=False) + assert bf_result.dtype == "Float64" + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + +def test_dataframe_agg_int_single_string(scalars_dfs): + numeric_cols = ["int64_col", "int64_too", "bool_col"] + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df[numeric_cols].agg("sum").to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg("sum") + + assert bf_result.dtype == "Int64" + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) def test_dataframe_agg_multi_string(scalars_dfs): @@ -2431,6 +2446,27 @@ def test_dataframe_agg_multi_string(scalars_dfs): ).all() +def test_dataframe_agg_int_multi_string(scalars_dfs): + numeric_cols = ["int64_col", "int64_too", "bool_col"] + aggregations = [ + "sum", + "nunique", + "count", + ] + scalars_df, scalars_pandas_df = scalars_dfs + bf_result = scalars_df[numeric_cols].agg(aggregations).to_pandas() + pd_result = scalars_pandas_df[numeric_cols].agg(aggregations) + + for dtype in bf_result.dtypes: + assert dtype == "Int64" + + # Pandas may produce narrower numeric types + # Pandas has object index type + pd.testing.assert_frame_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + @skip_legacy_pandas def test_df_describe(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs @@ -2982,6 +3018,58 @@ def test_loc_setitem_bool_series_scalar_error(scalars_dfs): pd_df.loc[pd_df["int64_too"] == 1, "string_col"] = 99 +@pytest.mark.parametrize( + ("col", "op"), + [ + # Int aggregates + pytest.param("int64_col", lambda x: x.sum(), id="int-sum"), + pytest.param("int64_col", lambda x: x.min(), id="int-min"), + pytest.param("int64_col", lambda x: x.max(), id="int-max"), + pytest.param("int64_col", lambda x: x.count(), id="int-count"), + pytest.param("int64_col", lambda x: x.nunique(), id="int-nunique"), + # Float aggregates + pytest.param("float64_col", lambda x: x.count(), id="float-count"), + pytest.param("float64_col", lambda x: x.nunique(), id="float-nunique"), + # Bool aggregates + pytest.param("bool_col", lambda x: x.sum(), id="bool-sum"), + pytest.param("bool_col", lambda x: x.count(), id="bool-count"), + pytest.param("bool_col", lambda x: x.nunique(), id="bool-nunique"), + # String aggregates + pytest.param("string_col", lambda x: x.count(), id="string-count"), + pytest.param("string_col", lambda x: x.nunique(), id="string-nunique"), + ], +) +def test_dataframe_aggregate_int(scalars_df_index, scalars_pandas_df_index, col, op): + bf_result = op(scalars_df_index[[col]]).to_pandas() + pd_result = op(scalars_pandas_df_index[[col]]) + + # Check dtype separately + assert bf_result.dtype == "Int64" + + # Pandas may produce narrower numeric types + # Pandas has object index type + assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) + + +@pytest.mark.parametrize( + ("col", "op"), + [ + pytest.param("bool_col", lambda x: x.min(), id="bool-min"), + pytest.param("bool_col", lambda x: x.max(), id="bool-max"), + ], +) +def test_dataframe_aggregate_bool(scalars_df_index, scalars_pandas_df_index, col, op): + bf_result = op(scalars_df_index[[col]]).to_pandas() + pd_result = op(scalars_pandas_df_index[[col]]) + + # Check dtype separately + assert bf_result.dtype == "boolean" + + # Pandas may produce narrower numeric types + # Pandas has object index type + assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) + + @pytest.mark.parametrize( ("ordered"), [ @@ -2990,34 +3078,38 @@ def test_loc_setitem_bool_series_scalar_error(scalars_dfs): ], ) @pytest.mark.parametrize( - ("op"), + ("op", "bf_dtype"), [ - (lambda x: x.sum(numeric_only=True)), - (lambda x: x.mean(numeric_only=True)), - (lambda x: x.min(numeric_only=True)), - (lambda x: x.max(numeric_only=True)), - (lambda x: x.std(numeric_only=True)), - (lambda x: x.var(numeric_only=True)), - (lambda x: x.count(numeric_only=False)), - (lambda x: x.nunique()), + (lambda x: x.sum(numeric_only=True), "Float64"), + (lambda x: x.mean(numeric_only=True), "Float64"), + (lambda x: x.min(numeric_only=True), "Float64"), + (lambda x: x.max(numeric_only=True), "Float64"), + (lambda x: x.std(numeric_only=True), "Float64"), + (lambda x: x.var(numeric_only=True), "Float64"), + (lambda x: x.count(numeric_only=False), "Int64"), + (lambda x: x.nunique(), "Int64"), ], ids=["sum", "mean", "min", "max", "std", "var", "count", "nunique"], ) -def test_dataframe_aggregates(scalars_df_index, scalars_pandas_df_index, op, ordered): +def test_dataframe_aggregates( + scalars_df_index, scalars_pandas_df_index, op, bf_dtype, ordered +): col_names = ["int64_too", "float64_col", "string_col", "int64_col", "bool_col"] bf_series = op(scalars_df_index[col_names]) - pd_series = op(scalars_pandas_df_index[col_names]) bf_result = bf_series.to_pandas(ordered=ordered) + pd_result = op(scalars_pandas_df_index[col_names]) + + # Check dtype separately + assert bf_result.dtype == bf_dtype # Pandas may produce narrower numeric types, but bigframes always produces Float64 # Pandas has object index type - pd_series.index = pd_series.index.astype(pd.StringDtype(storage="pyarrow")) assert_series_equal( - pd_series, + pd_result, bf_result, + check_dtype=False, check_index_type=False, ignore_order=not ordered, - check_dtype=False, ) From 3acc4943e30446a8013a86b76823de8dcc0ab5a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Thu, 25 Apr 2024 09:14:10 -0500 Subject: [PATCH 07/18] refactor: cache table metadata alongside snapshot time (#636) This ensures the cached `primary_keys` is more likely to be correct, in case the user called ALTER TABLE after we originally cached the snapshot time. --- bigframes/session/__init__.py | 52 +++++++++------------------- bigframes/session/_io/bigquery.py | 54 ++++++++++++++++++++++++++++++ tests/unit/session/test_session.py | 7 ++-- 3 files changed, 75 insertions(+), 38 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index f3f1ffce16..1a0ea20e55 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -231,7 +231,9 @@ def __init__( # Now that we're starting the session, don't allow the options to be # changed. context._session_started = True - self._df_snapshot: Dict[bigquery.TableReference, datetime.datetime] = {} + self._df_snapshot: Dict[ + bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table] + ] = {} @property def bqclient(self): @@ -698,16 +700,25 @@ def _get_snapshot_sql_and_primary_key( column(s), then return those too so that ordering generation can be avoided. """ - # If there are primary keys defined, the query engine assumes these - # columns are unique, even if the constraint is not enforced. We make - # the same assumption and use these columns as the total ordering keys. - table = self.bqclient.get_table(table_ref) + ( + snapshot_timestamp, + table, + ) = bigframes_io.get_snapshot_datetime_and_table_metadata( + self.bqclient, + table_ref=table_ref, + api_name=api_name, + cache=self._df_snapshot, + use_cache=use_cache, + ) if table.location.casefold() != self._location.casefold(): raise ValueError( f"Current session is in {self._location} but dataset '{table.project}.{table.dataset_id}' is located in {table.location}" ) + # If there are primary keys defined, the query engine assumes these + # columns are unique, even if the constraint is not enforced. We make + # the same assumption and use these columns as the total ordering keys. primary_keys = None if ( (table_constraints := getattr(table, "table_constraints", None)) is not None @@ -718,37 +729,6 @@ def _get_snapshot_sql_and_primary_key( ): primary_keys = columns - job_config = bigquery.QueryJobConfig() - job_config.labels["bigframes-api"] = api_name - if use_cache and table_ref in self._df_snapshot.keys(): - snapshot_timestamp = self._df_snapshot[table_ref] - - # Cache hit could be unexpected. See internal issue 329545805. - # Raise a warning with more information about how to avoid the - # problems with the cache. - warnings.warn( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - stacklevel=6, - ) - else: - snapshot_timestamp = list( - self.bqclient.query( - "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", - job_config=job_config, - ).result() - )[0][0] - self._df_snapshot[table_ref] = snapshot_timestamp - try: table_expression = self.ibis_client.sql( bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index ac6ba4bae4..94576cfa12 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -23,6 +23,7 @@ import types from typing import Dict, Iterable, Optional, Sequence, Tuple, Union import uuid +import warnings import google.api_core.exceptions import google.cloud.bigquery as bigquery @@ -121,6 +122,59 @@ def table_ref_to_sql(table: bigquery.TableReference) -> str: return f"`{table.project}`.`{table.dataset_id}`.`{table.table_id}`" +def get_snapshot_datetime_and_table_metadata( + bqclient: bigquery.Client, + table_ref: bigquery.TableReference, + *, + api_name: str, + cache: Dict[bigquery.TableReference, Tuple[datetime.datetime, bigquery.Table]], + use_cache: bool = True, +) -> Tuple[datetime.datetime, bigquery.Table]: + cached_table = cache.get(table_ref) + if use_cache and cached_table is not None: + snapshot_timestamp, _ = cached_table + + # Cache hit could be unexpected. See internal issue 329545805. + # Raise a warning with more information about how to avoid the + # problems with the cache. + warnings.warn( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session().", + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + stacklevel=7, + ) + return cached_table + + # TODO(swast): It's possible that the table metadata is changed between now + # and when we run the CURRENT_TIMESTAMP() query to see when we can time + # travel to. Find a way to fetch the table metadata and BQ's current time + # atomically. + table = bqclient.get_table(table_ref) + + # TODO(b/336521938): Refactor to make sure we set the "bigframes-api" + # whereever we execute a query. + job_config = bigquery.QueryJobConfig() + job_config.labels["bigframes-api"] = api_name + snapshot_timestamp = list( + bqclient.query( + "SELECT CURRENT_TIMESTAMP() AS `current_timestamp`", + job_config=job_config, + ).result() + )[0][0] + cached_table = (snapshot_timestamp, table) + cache[table_ref] = cached_table + return cached_table + + def create_snapshot_sql( table_ref: bigquery.TableReference, current_timestamp: datetime.datetime ) -> str: diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 543196066a..4ba47190bd 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -42,8 +42,11 @@ def test_read_gbq_cached_table(): google.cloud.bigquery.DatasetReference("my-project", "my_dataset"), "my_table", ) - session._df_snapshot[table_ref] = datetime.datetime( - 1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc + table = google.cloud.bigquery.Table(table_ref) + table._properties["location"] = session._location + session._df_snapshot[table_ref] = ( + datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), + table, ) with pytest.warns(UserWarning, match=re.escape("use_cache=False")): From 3ffc1d275ae110bffea2f08e63ef75b053764a0c Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Thu, 25 Apr 2024 09:39:18 -0700 Subject: [PATCH 08/18] feat: support the `score` method for `PaLM2TextGenerator` (#634) * feat: support the score method for PaLM2TextGenerator * address comments * address additional comments * address minor comments --- bigframes/ml/core.py | 11 ++++ bigframes/ml/llm.py | 59 ++++++++++++++++++- bigframes/ml/sql.py | 10 ++++ tests/system/load/test_llm.py | 56 +++++++++++++++--- tests/unit/ml/test_sql.py | 14 +++++ .../sklearn/ensemble/_forest.py | 4 +- .../bigframes_vendored/xgboost/sklearn.py | 4 +- 7 files changed, 146 insertions(+), 12 deletions(-) diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index b94ae39687..12c881c19a 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -187,6 +187,17 @@ def evaluate(self, input_data: Optional[bpd.DataFrame] = None): return self._session.read_gbq(sql) + def llm_evaluate( + self, + input_data: bpd.DataFrame, + task_type: Optional[str] = None, + ): + sql = self._model_manipulation_sql_generator.ml_llm_evaluate( + input_data, task_type + ) + + return self._session.read_gbq(sql) + def arima_evaluate(self, show_all_candidate_models: bool = False): sql = self._model_manipulation_sql_generator.ml_arima_evaluate( show_all_candidate_models diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index 37a38cdd5c..4a58152d14 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -220,7 +220,7 @@ def predict( Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series): - Input DataFrame or Series, which needs to contain a column with name "prompt". Only the column will be used as input. + Input DataFrame or Series, which contains only one column of prompts. Prompts can include preamble, questions, suggestions, instructions, or examples. temperature (float, default 0.0): @@ -310,6 +310,63 @@ def predict( return df + def score( + self, + X: Union[bpd.DataFrame, bpd.Series], + y: Union[bpd.DataFrame, bpd.Series], + task_type: Literal[ + "text_generation", "classification", "summarization", "question_answering" + ] = "text_generation", + ) -> bpd.DataFrame: + """Calculate evaluation metrics of the model. + + .. note:: + + This product or feature is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the + Service Specific Terms(https://cloud.google.com/terms/service-terms#1). Pre-GA products and features are available "as is" + and might have limited support. For more information, see the launch stage descriptions + (https://cloud.google.com/products#product-launch-stages). + + .. note:: + + Output matches that of the BigQuery ML.EVALUTE function. + See: https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-evaluate#remote-model-llm + for the outputs relevant to this model type. + + Args: + X (bigframes.dataframe.DataFrame or bigframes.series.Series): + A BigQuery DataFrame as evaluation data, which contains only one column of input_text + that contains the prompt text to use when evaluating the model. + y (bigframes.dataframe.DataFrame or bigframes.series.Series): + A BigQuery DataFrame as evaluation labels, which contains only one column of output_text + that you would expect to be returned by the model. + task_type (str): + The type of the task for LLM model. Default to "text_generation". + Possible values: "text_generation", "classification", "summarization", and "question_answering". + + Returns: + bigframes.dataframe.DataFrame: The DataFrame as evaluation result. + """ + if not self._bqml_model: + raise RuntimeError("A model must be fitted before score") + + X, y = utils.convert_to_dataframe(X, y) + + if len(X.columns) != 1 or len(y.columns) != 1: + raise ValueError( + f"Only support one column as input for X and y. {constants.FEEDBACK_LINK}" + ) + + # BQML identified the column by name + X_col_label = cast(blocks.Label, X.columns[0]) + y_col_label = cast(blocks.Label, y.columns[0]) + X = X.rename(columns={X_col_label: "input_text"}) + y = y.rename(columns={y_col_label: "output_text"}) + + input_data = X.join(y, how="outer") + + return self._bqml_model.llm_evaluate(input_data, task_type) + def to_gbq(self, model_name: str, replace: bool = False) -> PaLM2TextGenerator: """Save the model to BigQuery. diff --git a/bigframes/ml/sql.py b/bigframes/ml/sql.py index 59c768ce81..3679be16c6 100644 --- a/bigframes/ml/sql.py +++ b/bigframes/ml/sql.py @@ -318,6 +318,16 @@ def ml_evaluate(self, source_df: Optional[bpd.DataFrame] = None) -> str: return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`, ({source_sql}))""" + # ML evaluation TVFs + def ml_llm_evaluate( + self, source_df: bpd.DataFrame, task_type: Optional[str] = None + ) -> str: + """Encode ML.EVALUATE for BQML""" + # Note: don't need index as evaluate returns a new table + source_sql, _, _ = source_df._to_sql_query(include_index=False) + return f"""SELECT * FROM ML.EVALUATE(MODEL `{self._model_name}`, + ({source_sql}), STRUCT("{task_type}" AS task_type))""" + # ML evaluation TVFs def ml_arima_evaluate(self, show_all_candidate_models: bool = False) -> str: """Encode ML.ARMIA_EVALUATE for BQML""" diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index d56f6100c1..835b31955e 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -22,13 +22,12 @@ def llm_fine_tune_df_default_index( session: bigframes.Session, ) -> bigframes.dataframe.DataFrame: - sql = """ -SELECT - CONCAT("Please do sentiment analysis on the following text and only output a number from 0 to 5 where 0 means sadness, 1 means joy, 2 means love, 3 means anger, 4 means fear, and 5 means surprise. Text: ", text) as prompt, - CAST(label AS STRING) as label -FROM `llm_tuning.emotion_classification_train` -""" - return session.read_gbq(sql) + training_table_name = "llm_tuning.emotion_classification_train" + df = session.read_gbq(training_table_name) + prefix = "Please do sentiment analysis on the following text and only output a number from 0 to 5 where 0 means sadness, 1 means joy, 2 means love, 3 means anger, 4 means fear, and 5 means surprise. Text: " + df["prompt"] = prefix + df["text"] + df["label"] = df["label"].astype("string") + return df @pytest.fixture(scope="session") @@ -69,3 +68,46 @@ def test_llm_palm_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_ assert all(series.str.len() == 1) # TODO(ashleyxu b/335492787): After bqml rolled out version control: save, load, check parameters to ensure configuration was kept + + +def test_llm_palm_score(llm_fine_tune_df_default_index): + model = bigframes.ml.llm.PaLM2TextGenerator(model_name="text-bison") + + # Check score to ensure the model was fitted + score_result = model.score( + X=llm_fine_tune_df_default_index[["prompt"]], + y=llm_fine_tune_df_default_index[["label"]], + ).to_pandas() + score_result_col = score_result.columns.to_list() + expected_col = [ + "bleu4_score", + "rouge-l_precision", + "rouge-l_recall", + "rouge-l_f1_score", + "evaluation_status", + ] + assert all(col in score_result_col for col in expected_col) + + +def test_llm_palm_score_params(llm_fine_tune_df_default_index): + model = bigframes.ml.llm.PaLM2TextGenerator( + model_name="text-bison", max_iterations=1 + ) + + # Check score to ensure the model was fitted + score_result = model.score( + X=llm_fine_tune_df_default_index["prompt"], + y=llm_fine_tune_df_default_index["label"], + task_type="classification", + ).to_pandas() + score_result_col = score_result.columns.to_list() + expected_col = [ + "trial_id", + "precision", + "recall", + "accuracy", + "f1_score", + "log_loss", + "roc_auc", + ] + assert all(col in score_result_col for col in expected_col) diff --git a/tests/unit/ml/test_sql.py b/tests/unit/ml/test_sql.py index 3560f05cb6..1a5e8fe962 100644 --- a/tests/unit/ml/test_sql.py +++ b/tests/unit/ml/test_sql.py @@ -319,6 +319,20 @@ def test_ml_predict_correct( ) +def test_ml_llm_evaluate_correct( + model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator, + mock_df: bpd.DataFrame, +): + sql = model_manipulation_sql_generator.ml_llm_evaluate( + source_df=mock_df, task_type="CLASSIFICATION" + ) + assert ( + sql + == """SELECT * FROM ML.EVALUATE(MODEL `my_project_id.my_dataset_id.my_model_id`, + (input_X_sql), STRUCT("CLASSIFICATION" AS task_type))""" + ) + + def test_ml_evaluate_correct( model_manipulation_sql_generator: ml_sql.ModelManipulationSqlGenerator, mock_df: bpd.DataFrame, diff --git a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py index 53a211dd7f..a55b7b80d3 100644 --- a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py +++ b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py @@ -95,7 +95,7 @@ class RandomForestRegressor(ForestRegressor): Number of parallel trees constructed during each iteration. Default to 100. Minimum value is 2. tree_method (Optional[str]): Specify which tree method to use. Default to "auto". If this parameter is set to - default, XGBoost will choose the most conservative option available. Possible values: ""exact", "approx", + default, XGBoost will choose the most conservative option available. Possible values: "exact", "approx", "hist". min_child_weight (Optional[float]): Minimum sum of instance weight(hessian) needed in a child. Default to 1. @@ -160,7 +160,7 @@ class RandomForestClassifier(ForestClassifier): Number of parallel trees constructed during each iteration. Default to 100. Minimum value is 2. tree_method (Optional[str]): Specify which tree method to use. Default to "auto". If this parameter is set to - default, XGBoost will choose the most conservative option available. Possible values: ""exact", "approx", + default, XGBoost will choose the most conservative option available. Possible values: "exact", "approx", "hist". min_child_weight (Optional[float]): Minimum sum of instance weight(hessian) needed in a child. Default to 1. diff --git a/third_party/bigframes_vendored/xgboost/sklearn.py b/third_party/bigframes_vendored/xgboost/sklearn.py index 424b17a371..5a2a69dff4 100644 --- a/third_party/bigframes_vendored/xgboost/sklearn.py +++ b/third_party/bigframes_vendored/xgboost/sklearn.py @@ -63,7 +63,7 @@ class XGBRegressor(XGBModel, XGBRegressorBase): Type of normalization algorithm for DART booster. Possible values: "TREE", "FOREST". Default to "TREE". tree_method (Optional[str]): Specify which tree method to use. Default to "auto". If this parameter is set to - default, XGBoost will choose the most conservative option available. Possible values: ""exact", "approx", + default, XGBoost will choose the most conservative option available. Possible values: "exact", "approx", "hist". min_child_weight (Optional[float]): Minimum sum of instance weight(hessian) needed in a child. Default to 1. @@ -110,7 +110,7 @@ class XGBClassifier(XGBModel, XGBClassifierMixIn, XGBClassifierBase): Type of normalization algorithm for DART booster. Possible values: "TREE", "FOREST". Default to "TREE". tree_method (Optional[str]): Specify which tree method to use. Default to "auto". If this parameter is set to - default, XGBoost will choose the most conservative option available. Possible values: ""exact", "approx", + default, XGBoost will choose the most conservative option available. Possible values: "exact", "approx", "hist". min_child_weight (Optional[float]): Minimum sum of instance weight(hessian) needed in a child. Default to 1. From 44b738df07d0ee9d9ae2ced339a123f31139f887 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Thu, 25 Apr 2024 11:24:30 -0700 Subject: [PATCH 09/18] perf: Cache transpose to allow performant retranspose (#635) --- bigframes/core/blocks.py | 80 ++++++++++++++++++++-------- bigframes/core/compile/compiled.py | 17 +++--- bigframes/core/ordering.py | 2 + bigframes/dataframe.py | 47 ++++++++-------- tests/system/small/test_dataframe.py | 14 +++++ 5 files changed, 105 insertions(+), 55 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2a888125f8..5965c96374 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -27,7 +27,7 @@ import os import random import typing -from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple +from typing import Iterable, List, Literal, Mapping, Optional, Sequence, Tuple, Union import warnings import google.cloud.bigquery as bigquery @@ -105,6 +105,8 @@ def __init__( index_columns: Iterable[str], column_labels: typing.Union[pd.Index, typing.Iterable[Label]], index_labels: typing.Union[pd.Index, typing.Iterable[Label], None] = None, + *, + transpose_cache: Optional[Block] = None, ): """Construct a block object, will create default index if no index columns specified.""" index_columns = list(index_columns) @@ -144,6 +146,7 @@ def __init__( # TODO(kemppeterson) Add a cache for corr to parallel the single-column stats. self._stats_cache[" ".join(self.index_columns)] = {} + self._transpose_cache: Optional[Block] = transpose_cache @classmethod def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block: @@ -716,6 +719,15 @@ def with_column_labels( index_labels=self.index.names, ) + def with_transpose_cache(self, transposed: Block): + return Block( + self._expr, + index_columns=self.index_columns, + column_labels=self._column_labels, + index_labels=self.index.names, + transpose_cache=transposed, + ) + def with_index_labels(self, value: typing.Sequence[Label]) -> Block: if len(value) != len(self.index_columns): raise ValueError( @@ -804,18 +816,35 @@ def multi_apply_window_op( def multi_apply_unary_op( self, columns: typing.Sequence[str], - op: ops.UnaryOp, + op: Union[ops.UnaryOp, ex.Expression], ) -> Block: + if isinstance(op, ops.UnaryOp): + input_varname = guid.generate_guid() + expr = op.as_expr(input_varname) + else: + input_varnames = op.unbound_variables + assert len(input_varnames) == 1 + expr = op + input_varname = input_varnames[0] + block = self - for i, col_id in enumerate(columns): + for col_id in columns: label = self.col_id_to_label[col_id] - block, result_id = block.apply_unary_op( - col_id, - op, - result_label=label, + block, result_id = block.project_expr( + expr.bind_all_variables({input_varname: ex.free_var(col_id)}), + label=label, ) block = block.copy_values(result_id, col_id) block = block.drop_columns([result_id]) + # Special case, we can preserve transpose cache for full-frame unary ops + if (self._transpose_cache is not None) and set(self.value_columns) == set( + columns + ): + transpose_columns = self._transpose_cache.value_columns + new_transpose_cache = self._transpose_cache.multi_apply_unary_op( + transpose_columns, op + ) + block = block.with_transpose_cache(new_transpose_cache) return block def apply_window_op( @@ -922,20 +951,17 @@ def aggregate_all_and_stack( (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) for col_id in self.value_columns ] - index_col_ids = [ - guid.generate_guid() for i in range(self.column_labels.nlevels) - ] - result_expr = self.expr.aggregate(aggregations, dropna=dropna).unpivot( - row_labels=self.column_labels.to_list(), - index_col_ids=index_col_ids, - unpivot_columns=tuple([(value_col_id, tuple(self.value_columns))]), - ) + index_id = guid.generate_guid() + result_expr = self.expr.aggregate( + aggregations, dropna=dropna + ).assign_constant(index_id, None, None) + # Transpose as last operation so that final block has valid transpose cache return Block( result_expr, - index_columns=index_col_ids, - column_labels=[None], - index_labels=self.column_labels.names, - ) + index_columns=[index_id], + column_labels=self.column_labels, + index_labels=[None], + ).transpose(original_row_index=pd.Index([None])) else: # axis_n == 1 # using offsets as identity to group on. # TODO: Allow to promote identity/total_order columns instead for better perf @@ -1575,10 +1601,19 @@ def melt( index_columns=[index_id], ) - def transpose(self) -> Block: - """Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows""" + def transpose(self, *, original_row_index: Optional[pd.Index] = None) -> Block: + """Transpose the block. Will fail if dtypes aren't coercible to a common type or too many rows. + Can provide the original_row_index directly if it is already known, otherwise a query is needed. + """ + if self._transpose_cache is not None: + return self._transpose_cache.with_transpose_cache(self) + original_col_index = self.column_labels - original_row_index = self.index.to_pandas() + original_row_index = ( + original_row_index + if original_row_index is not None + else self.index.to_pandas() + ) original_row_count = len(original_row_index) if original_row_count > bigframes.constants.MAX_COLUMNS: raise NotImplementedError( @@ -1619,6 +1654,7 @@ def transpose(self) -> Block: result.with_column_labels(original_row_index) .order_by([ordering.ascending_over(result.index_columns[-1])]) .drop_levels([result.index_columns[-1]]) + .with_transpose_cache(self) ) def _create_stack_column( diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index a59d599679..88c1006c79 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -823,17 +823,14 @@ def to_sql( for col in baked_ir.column_ids ] selection = ", ".join(map(lambda col_id: f"`{col_id}`", output_columns)) - order_by_clause = baked_ir._ordering_clause( - baked_ir._ordering.all_ordering_columns - ) - sql = textwrap.dedent( - f"SELECT {selection}\n" - "FROM (\n" - f"{sql}\n" - ")\n" - f"{order_by_clause}\n" - ) + sql = textwrap.dedent(f"SELECT {selection}\n" "FROM (\n" f"{sql}\n" ")\n") + # Single row frames may not have any ordering columns + if len(baked_ir._ordering.all_ordering_columns) > 0: + order_by_clause = baked_ir._ordering_clause( + baked_ir._ordering.all_ordering_columns + ) + sql += f"{order_by_clause}\n" else: sql = ibis_bigquery.Backend().compile( self._to_ibis_expr( diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index 2543a3b722..9009e31be3 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -167,6 +167,8 @@ def _truncate_ordering( truncated_refs.append(order_part) if columns_seen.issuperset(must_see): return tuple(truncated_refs) + if len(must_see) == 0: + return () raise ValueError("Ordering did not contain all total_order_cols") def with_reverse(self): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index a55b7f569b..48c4af7a37 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -693,18 +693,19 @@ def _apply_binop( def _apply_scalar_binop( self, other: float | int, op: ops.BinaryOp, reverse: bool = False ) -> DataFrame: - block = self._block - for column_id, label in zip( - self._block.value_columns, self._block.column_labels - ): - expr = ( - op.as_expr(ex.const(other), column_id) - if reverse - else op.as_expr(column_id, ex.const(other)) + if reverse: + expr = op.as_expr( + left_input=ex.const(other), + right_input=bigframes.core.guid.generate_guid(), ) - block, _ = block.project_expr(expr, label) - block = block.drop_columns([column_id]) - return DataFrame(block) + else: + expr = op.as_expr( + left_input=bigframes.core.guid.generate_guid(), + right_input=ex.const(other), + ) + return DataFrame( + self._block.multi_apply_unary_op(self._block.value_columns, expr) + ) def _apply_series_binop_axis_0( self, @@ -1974,7 +1975,7 @@ def any( else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.any_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def all( self, axis: typing.Union[str, int] = 0, *, bool_only: bool = False @@ -1984,7 +1985,7 @@ def all( else: frame = self._drop_non_bool() block = frame._block.aggregate_all_and_stack(agg_ops.all_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def sum( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -1994,7 +1995,7 @@ def sum( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.sum_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def mean( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2004,7 +2005,7 @@ def mean( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.mean_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def median( self, *, numeric_only: bool = False, exact: bool = True @@ -2019,7 +2020,7 @@ def median( return result else: block = frame._block.aggregate_all_and_stack(agg_ops.median_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def quantile( self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False @@ -2052,7 +2053,7 @@ def std( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.std_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def var( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2062,7 +2063,7 @@ def var( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.var_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def min( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2072,7 +2073,7 @@ def min( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.min_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def max( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2082,7 +2083,7 @@ def max( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.max_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def prod( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False @@ -2092,7 +2093,7 @@ def prod( else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.product_op, axis=axis) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) product = prod product.__doc__ = inspect.getdoc(vendored_pandas_frame.DataFrame.prod) @@ -2103,11 +2104,11 @@ def count(self, *, numeric_only: bool = False) -> bigframes.series.Series: else: frame = self._drop_non_numeric() block = frame._block.aggregate_all_and_stack(agg_ops.count_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def nunique(self) -> bigframes.series.Series: block = self._block.aggregate_all_and_stack(agg_ops.nunique_op) - return bigframes.series.Series(block.select_column("values")) + return bigframes.series.Series(block) def agg( self, func: str | typing.Sequence[str] diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 3230ad2a89..2a4b53403d 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2524,6 +2524,20 @@ def test_df_transpose_error(): dataframe.DataFrame([[1, "hello"], [2, "world"]]).transpose() +def test_df_transpose_repeated_uses_cache(): + bf_df = dataframe.DataFrame([[1, 2.5], [2, 3.5]]) + pd_df = pandas.DataFrame([[1, 2.5], [2, 3.5]]) + # Transposing many times so that operation will fail from complexity if not using cache + for i in range(10): + # Cache still works even with simple scalar binop + bf_df = bf_df.transpose() + i + pd_df = pd_df.transpose() + i + + pd.testing.assert_frame_equal( + pd_df, bf_df.to_pandas(), check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("ordered"), [ From ce5649513b66c5191a56fc1fd29240b5dbe02394 Mon Sep 17 00:00:00 2001 From: Henry Solberg Date: Thu, 25 Apr 2024 11:36:36 -0700 Subject: [PATCH 10/18] fix: Allow to_pandas to download more than 10GB (#637) * fix: Allow to_pandas to download more than 10GB * remove unecessary line --- bigframes/core/blocks.py | 9 +++++++-- bigframes/session/__init__.py | 16 ++++++++++------ tests/system/load/test_large_tables.py | 11 +++++++++++ 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 5965c96374..4ff8a1836b 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -517,9 +517,14 @@ def _materialize_local( ) -> Tuple[pd.DataFrame, bigquery.QueryJob]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. - results_iterator, query_job = self.session._execute( - self.expr, sorted=materialize_options.ordered + _, query_job = self.session._query_to_destination( + self.session._to_sql(self.expr, sorted=True), + list(self.index_columns), + api_name="cached", + do_clustering=False, ) + results_iterator = query_job.result() + table_size = ( self.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 1a0ea20e55..ecb672f090 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -430,7 +430,8 @@ def _query_to_destination( index_cols: List[str], api_name: str, configuration: dict = {"query": {"useQueryCache": True}}, - ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: + do_clustering=True, + ) -> Tuple[Optional[bigquery.TableReference], bigquery.QueryJob]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. dry_run_config = bigquery.QueryJobConfig() @@ -444,11 +445,14 @@ def _query_to_destination( # internal issue 303057336. # Since we have a `statement_type == 'SELECT'`, schema should be populated. schema = typing.cast(Iterable[bigquery.SchemaField], dry_run_job.schema) - cluster_cols = [ - item.name - for item in schema - if (item.name in index_cols) and _can_cluster_bq(item) - ][:_MAX_CLUSTER_COLUMNS] + if do_clustering: + cluster_cols = [ + item.name + for item in schema + if (item.name in index_cols) and _can_cluster_bq(item) + ][:_MAX_CLUSTER_COLUMNS] + else: + cluster_cols = [] temp_table = self._create_empty_temp_table(schema, cluster_cols) timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( diff --git a/tests/system/load/test_large_tables.py b/tests/system/load/test_large_tables.py index 22baa2268f..871c846c79 100644 --- a/tests/system/load/test_large_tables.py +++ b/tests/system/load/test_large_tables.py @@ -90,3 +90,14 @@ def test_to_pandas_batches_large_table(): del df assert row_count == expected_row_count + + +def test_to_pandas_large_table(): + df = bpd.read_gbq("load_testing.scalars_10gb") + # df will be downloaded locally + expected_row_count, expected_column_count = df.shape + + df = df.to_pandas() + row_count, column_count = df.shape + assert column_count == expected_column_count + assert row_count == expected_row_count From dfeaad0ae3b3557a9e8ccb21ddbdc55cfd611e0f Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 25 Apr 2024 22:10:54 +0000 Subject: [PATCH 11/18] feat: expose gcf max timeout in `remote_function` (#639) * feat: expose gcf max timeout in `remote_function` * remove duplicate test case from parametrize --- bigframes/functions/remote_function.py | 28 +++++++++++-- bigframes/pandas/__init__.py | 2 + bigframes/session/__init__.py | 12 ++++++ tests/system/large/test_remote_function.py | 49 ++++++++++++++++++++++ 4 files changed, 88 insertions(+), 3 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index f866575a26..4bb667ccc7 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -341,7 +341,9 @@ def generate_cloud_function_code(self, def_, dir, package_requirements=None): entry_point = self.generate_cloud_function_main_code(def_, dir) return entry_point - def create_cloud_function(self, def_, cf_name, package_requirements=None): + def create_cloud_function( + self, def_, cf_name, package_requirements=None, cloud_function_timeout=600 + ): """Create a cloud function from the given user defined function.""" # Build and deploy folder structure containing cloud function @@ -409,7 +411,14 @@ def create_cloud_function(self, def_, cf_name, package_requirements=None): ) function.service_config = functions_v2.ServiceConfig() function.service_config.available_memory = "1024M" - function.service_config.timeout_seconds = 600 + if cloud_function_timeout is not None: + if cloud_function_timeout > 1200: + raise ValueError( + "BigQuery remote function can wait only up to 20 minutes" + ", see for more details " + "https://cloud.google.com/bigquery/quotas#remote_function_limits." + ) + function.service_config.timeout_seconds = cloud_function_timeout function.service_config.service_account_email = ( self._cloud_function_service_account ) @@ -456,6 +465,7 @@ def provision_bq_remote_function( name, package_requirements, max_batching_rows, + cloud_function_timeout, ): """Provision a BigQuery remote function.""" # If reuse of any existing function with the same name (indicated by the @@ -477,7 +487,7 @@ def provision_bq_remote_function( # Create the cloud function if it does not exist if not cf_endpoint: cf_endpoint = self.create_cloud_function( - def_, cloud_function_name, package_requirements + def_, cloud_function_name, package_requirements, cloud_function_timeout ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") @@ -631,6 +641,7 @@ def remote_function( cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, max_batching_rows: Optional[int] = 1000, + cloud_function_timeout: Optional[int] = 600, ): """Decorator to turn a user defined function into a BigQuery remote function. @@ -756,6 +767,16 @@ def remote_function( `None` can be passed to let BQ remote functions service apply default batching. See for more details https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. + cloud_function_timeout (int, Optional): + The maximum amount of time (in seconds) BigQuery should wait for + the cloud function to return a response. See for more details + https://cloud.google.com/functions/docs/configuring/timeout. + Please note that even though the cloud function (2nd gen) itself + allows seeting up to 60 minutes of timeout, BigQuery remote + function can wait only up to 20 minutes, see for more details + https://cloud.google.com/bigquery/quotas#remote_function_limits. + By default BigQuery DataFrames uses a 10 minute timeout. `None` + can be passed to let the cloud functions default timeout take effect. """ import bigframes.pandas as bpd @@ -880,6 +901,7 @@ def wrapper(f): name, packages, max_batching_rows, + cloud_function_timeout, ) # TODO: Move ibis logic to compiler step diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 96af6ab1b3..71ef4e609e 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -644,6 +644,7 @@ def remote_function( cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, max_batching_rows: Optional[int] = 1000, + cloud_function_timeout: Optional[int] = 600, ): return global_session.with_default_session( bigframes.session.Session.remote_function, @@ -658,6 +659,7 @@ def remote_function( cloud_function_kms_key_name=cloud_function_kms_key_name, cloud_function_docker_repository=cloud_function_docker_repository, max_batching_rows=max_batching_rows, + cloud_function_timeout=cloud_function_timeout, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index ecb672f090..973c87b59b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1529,6 +1529,7 @@ def remote_function( cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, max_batching_rows: Optional[int] = 1000, + cloud_function_timeout: Optional[int] = 600, ): """Decorator to turn a user defined function into a BigQuery remote function. Check out the code samples at: https://cloud.google.com/bigquery/docs/remote-functions#bigquery-dataframes. @@ -1632,6 +1633,16 @@ def remote_function( `None` can be passed to let BQ remote functions service apply default batching. See for more details https://cloud.google.com/bigquery/docs/remote-functions#limiting_number_of_rows_in_a_batch_request. + cloud_function_timeout (int, Optional): + The maximum amount of time (in seconds) BigQuery should wait for + the cloud function to return a response. See for more details + https://cloud.google.com/functions/docs/configuring/timeout. + Please note that even though the cloud function (2nd gen) itself + allows seeting up to 60 minutes of timeout, BigQuery remote + function can wait only up to 20 minutes, see for more details + https://cloud.google.com/bigquery/quotas#remote_function_limits. + By default BigQuery DataFrames uses a 10 minute timeout. `None` + can be passed to let the cloud functions default timeout take effect. Returns: callable: A remote function object pointing to the cloud assets created in the background to support the remote execution. The cloud assets can be @@ -1654,6 +1665,7 @@ def remote_function( cloud_function_kms_key_name=cloud_function_kms_key_name, cloud_function_docker_repository=cloud_function_docker_repository, max_batching_rows=max_batching_rows, + cloud_function_timeout=cloud_function_timeout, ) def read_gbq_function( diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index ec9acc292e..6cae893f9c 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1336,3 +1336,52 @@ def square(x): cleanup_remote_function_assets( session.bqclient, session.cloudfunctionsclient, square_remote ) + + +@pytest.mark.parametrize( + ("timeout_args", "effective_gcf_timeout"), + [ + pytest.param({}, 600, id="no-set"), + pytest.param({"cloud_function_timeout": None}, 60, id="set-None"), + pytest.param({"cloud_function_timeout": 1200}, 1200, id="set-max-allowed"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_timeout( + session, scalars_dfs, timeout_args, effective_gcf_timeout +): + try: + + def square(x): + return x * x + + square_remote = session.remote_function( + [int], int, reuse=False, **timeout_args + )(square) + + # Assert that the GCF is created with the intended maximum timeout + gcf = session.cloudfunctionsclient.get_function( + name=square_remote.bigframes_cloud_function + ) + assert gcf.service_config.timeout_seconds == effective_gcf_timeout + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df["int64_too"].apply(square_remote).to_pandas() + pd_result = scalars_pandas_df["int64_too"].apply(square) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, square_remote + ) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_gcf_timeout_max_supported_exceeded(session): + with pytest.raises(ValueError): + + @session.remote_function([int], int, reuse=False, cloud_function_timeout=1201) + def square(x): + return x * x From 9005c6e79297d7130e93a0e632eb3936aa145efe Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Fri, 26 Apr 2024 09:12:09 -0700 Subject: [PATCH 12/18] fix: Extend row hash to 128 bits to guarantee unique row id (#632) * fix: Extend row hash to 128 bits to guarantee unique row id * decide hash size based on row count * fix read_gbq tests * handle unknown row_num --- bigframes/session/__init__.py | 50 +++++++++++++++++++++--------- tests/unit/resources.py | 10 ++++++ tests/unit/session/test_session.py | 21 ++++++++++--- 3 files changed, 62 insertions(+), 19 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 973c87b59b..34047ff155 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -54,6 +54,7 @@ import google.api_core.gapic_v1.client_info import google.auth.credentials import google.cloud.bigquery as bigquery +import google.cloud.bigquery.table import google.cloud.bigquery_connection_v1 import google.cloud.bigquery_storage_v1 import google.cloud.functions_v2 @@ -693,7 +694,7 @@ def read_gbq_table( def _get_snapshot_sql_and_primary_key( self, - table_ref: bigquery.table.TableReference, + table: google.cloud.bigquery.table.Table, *, api_name: str, use_cache: bool = True, @@ -709,7 +710,7 @@ def _get_snapshot_sql_and_primary_key( table, ) = bigframes_io.get_snapshot_datetime_and_table_metadata( self.bqclient, - table_ref=table_ref, + table_ref=table.reference, api_name=api_name, cache=self._df_snapshot, use_cache=use_cache, @@ -735,7 +736,7 @@ def _get_snapshot_sql_and_primary_key( try: table_expression = self.ibis_client.sql( - bigframes_io.create_snapshot_sql(table_ref, snapshot_timestamp) + bigframes_io.create_snapshot_sql(table.reference, snapshot_timestamp) ) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: @@ -763,8 +764,9 @@ def _read_gbq_table( query, default_project=self.bqclient.project ) + table = self.bqclient.get_table(table_ref) (table_expression, primary_keys,) = self._get_snapshot_sql_and_primary_key( - table_ref, api_name=api_name, use_cache=use_cache + table, api_name=api_name, use_cache=use_cache ) total_ordering_cols = primary_keys @@ -836,9 +838,13 @@ def _read_gbq_table( ordering=ordering, ) else: - array_value = self._create_total_ordering(table_expression) + array_value = self._create_total_ordering( + table_expression, table_rows=table.num_rows + ) else: - array_value = self._create_total_ordering(table_expression) + array_value = self._create_total_ordering( + table_expression, table_rows=table.num_rows + ) value_columns = [col for col in array_value.column_ids if col not in index_cols] block = blocks.Block( @@ -1459,10 +1465,19 @@ def _create_empty_temp_table( def _create_total_ordering( self, table: ibis_types.Table, + table_rows: Optional[int], ) -> core.ArrayValue: # Since this might also be used as the index, don't use the default # "ordering ID" name. + + # For small tables, 64 bits is enough to avoid collisions, 128 bits will never ever collide no matter what + # Assume table is large if table row count is unknown + use_double_hash = ( + (table_rows is None) or (table_rows == 0) or (table_rows > 100000) + ) + ordering_hash_part = guid.generate_guid("bigframes_ordering_") + ordering_hash_part2 = guid.generate_guid("bigframes_ordering_") ordering_rand_part = guid.generate_guid("bigframes_ordering_") # All inputs into hash must be non-null or resulting hash will be null @@ -1475,25 +1490,30 @@ def _create_total_ordering( else str_values[0] ) full_row_hash = full_row_str.hash().name(ordering_hash_part) + # By modifying value slightly, we get another hash uncorrelated with the first + full_row_hash_p2 = (full_row_str + "_").hash().name(ordering_hash_part2) # Used to disambiguate between identical rows (which will have identical hash) random_value = ibis.random().name(ordering_rand_part) + order_values = ( + [full_row_hash, full_row_hash_p2, random_value] + if use_double_hash + else [full_row_hash, random_value] + ) + original_column_ids = table.columns table_with_ordering = table.select( - itertools.chain(original_column_ids, [full_row_hash, random_value]) + itertools.chain(original_column_ids, order_values) ) - ordering_ref1 = order.ascending_over(ordering_hash_part) - ordering_ref2 = order.ascending_over(ordering_rand_part) ordering = order.ExpressionOrdering( - ordering_value_columns=(ordering_ref1, ordering_ref2), - total_ordering_columns=frozenset([ordering_hash_part, ordering_rand_part]), + ordering_value_columns=tuple( + order.ascending_over(col.get_name()) for col in order_values + ), + total_ordering_columns=frozenset(col.get_name() for col in order_values), ) columns = [table_with_ordering[col] for col in original_column_ids] - hidden_columns = [ - table_with_ordering[ordering_hash_part], - table_with_ordering[ordering_rand_part], - ] + hidden_columns = [table_with_ordering[col.get_name()] for col in order_values] return core.ArrayValue.from_ibis( self, table_with_ordering, diff --git a/tests/unit/resources.py b/tests/unit/resources.py index 28b08e49dc..623af93f65 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -44,6 +44,12 @@ def create_bigquery_session( google.auth.credentials.Credentials, instance=True ) + if anonymous_dataset is None: + anonymous_dataset = google.cloud.bigquery.DatasetReference( + "test-project", + "test_dataset", + ) + if bqclient is None: bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) bqclient.project = "test-project" @@ -53,6 +59,10 @@ def create_bigquery_session( table._properties = {} type(table).location = mock.PropertyMock(return_value="test-region") type(table).schema = mock.PropertyMock(return_value=table_schema) + type(table).reference = mock.PropertyMock( + return_value=anonymous_dataset.table("test_table") + ) + type(table).num_rows = mock.PropertyMock(return_value=1000000000) bqclient.get_table.return_value = table if anonymous_dataset is None: diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 4ba47190bd..34f185cafd 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -49,6 +49,16 @@ def test_read_gbq_cached_table(): table, ) + def get_table_mock(table_ref): + table = google.cloud.bigquery.Table( + table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) + ) + table._properties["numRows"] = "1000000000" + table._properties["location"] = session._location + return table + + session.bqclient.get_table = get_table_mock + with pytest.warns(UserWarning, match=re.escape("use_cache=False")): df = session.read_gbq("my-project.my_dataset.my_table") @@ -137,10 +147,13 @@ def query_mock(query, *args, **kwargs): session.bqclient.query = query_mock - def get_table_mock(dataset_ref): - dataset = google.cloud.bigquery.Dataset(dataset_ref) - dataset.location = session._location - return dataset + def get_table_mock(table_ref): + table = google.cloud.bigquery.Table( + table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) + ) + table._properties["numRows"] = 1000000000 + table._properties["location"] = session._location + return table session.bqclient.get_table = get_table_mock From 1e7793cdcb56b8c0bcccc1c1ab356bac44454592 Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Fri, 26 Apr 2024 09:24:27 -0700 Subject: [PATCH 13/18] docs: address more technical writers' feedback (#640) --- bigframes/ml/cluster.py | 4 +-- bigframes/ml/decomposition.py | 4 +-- bigframes/ml/forecasting.py | 2 +- bigframes/ml/imported.py | 6 ++-- bigframes/ml/linear_model.py | 8 +++--- bigframes/ml/llm.py | 28 +++++++++---------- bigframes/ml/pipeline.py | 6 ++-- bigframes/ml/remote.py | 10 +++---- .../sklearn/cluster/_kmeans.py | 2 +- .../sklearn/ensemble/_forest.py | 2 +- .../sklearn/linear_model/_base.py | 2 +- .../sklearn/linear_model/_logistic.py | 2 +- .../sklearn/metrics/pairwise.py | 6 ++-- .../bigframes_vendored/sklearn/pipeline.py | 5 ++-- .../sklearn/preprocessing/_encoder.py | 4 +-- .../bigframes_vendored/xgboost/sklearn.py | 2 +- 16 files changed, 46 insertions(+), 47 deletions(-) diff --git a/bigframes/ml/cluster.py b/bigframes/ml/cluster.py index e63764e7bb..e572bb3bfb 100644 --- a/bigframes/ml/cluster.py +++ b/bigframes/ml/cluster.py @@ -175,12 +175,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> KMeans: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - KMeans: saved model.""" + KMeans: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") diff --git a/bigframes/ml/decomposition.py b/bigframes/ml/decomposition.py index 0dfb46efaa..01b1fda628 100644 --- a/bigframes/ml/decomposition.py +++ b/bigframes/ml/decomposition.py @@ -169,12 +169,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> PCA: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - PCA: saved model.""" + PCA: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") diff --git a/bigframes/ml/forecasting.py b/bigframes/ml/forecasting.py index a7e0c3c0d9..5bd01c8826 100644 --- a/bigframes/ml/forecasting.py +++ b/bigframes/ml/forecasting.py @@ -361,7 +361,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> ARIMAPlus: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. diff --git a/bigframes/ml/imported.py b/bigframes/ml/imported.py index 9198b4eafb..a642fae74d 100644 --- a/bigframes/ml/imported.py +++ b/bigframes/ml/imported.py @@ -89,7 +89,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> TensorFlowModel: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Default to False. @@ -166,7 +166,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> ONNXModel: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. @@ -282,7 +282,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> XGBoostModel: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. diff --git a/bigframes/ml/linear_model.py b/bigframes/ml/linear_model.py index 63462be09f..0c76a39a1c 100644 --- a/bigframes/ml/linear_model.py +++ b/bigframes/ml/linear_model.py @@ -182,12 +182,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> LinearRegression: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - LinearRegression: saved model.""" + LinearRegression: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") @@ -347,12 +347,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> LogisticRegression: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - LogisticRegression: saved model.""" + LogisticRegression: Saved model.""" if not self._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index 4a58152d14..b455e35b67 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -64,8 +64,8 @@ class PaLM2TextGenerator(base.BaseEstimator): BQ session to create the model. If None, use the global default session. connection_name (str or None): Connection to connect with remote service. str of the format ... - if None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach - permission if the connection isn't fully setup. + If None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach + permission if the connection isn't fully set up. max_iterations (Optional[int], Default to 300): The number of steps to run when performing supervised tuning. """ @@ -191,7 +191,7 @@ def fit( Training labels. Returns: - PaLM2TextGenerator: Fitted Estimator. + PaLM2TextGenerator: Fitted estimator. """ X, y = utils.convert_to_dataframe(X, y) @@ -372,12 +372,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> PaLM2TextGenerator: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - PaLM2TextGenerator: saved model.""" + PaLM2TextGenerator: Saved model.""" new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name) @@ -390,7 +390,7 @@ class PaLM2TextEmbeddingGenerator(base.BaseEstimator): Args: model_name (str, Default to "textembedding-gecko"): The model for text embedding. “textembedding-gecko” returns model embeddings for text inputs. - "textembedding-gecko-multilingual" returns model embeddings for text inputs which support over 100 languages + "textembedding-gecko-multilingual" returns model embeddings for text inputs which support over 100 languages. Default to "textembedding-gecko". version (str or None): Model version. Accepted values are "001", "002", "003", "latest" etc. Will use the default version if unset. @@ -398,8 +398,8 @@ class PaLM2TextEmbeddingGenerator(base.BaseEstimator): session (bigframes.Session or None): BQ session to create the model. If None, use the global default session. connection_name (str or None): - connection to connect with remote service. str of the format ... - if None, use default connection in session context. + Connection to connect with remote service. str of the format ... + If None, use default connection in session context. """ def __init__( @@ -539,12 +539,12 @@ def to_gbq( Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - PaLM2TextEmbeddingGenerator: saved model.""" + PaLM2TextEmbeddingGenerator: Saved model.""" new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name) @@ -565,8 +565,8 @@ class GeminiTextGenerator(base.BaseEstimator): BQ session to create the model. If None, use the global default session. connection_name (str or None): Connection to connect with remote service. str of the format ... - if None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach - permission if the connection isn't fully setup. + If None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach + permission if the connection isn't fully set up. """ def __init__( @@ -719,12 +719,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> GeminiTextGenerator: Args: model_name (str): - the name of the model. + The name of the model. replace (bool, default False): Determine whether to replace if the model already exists. Default to False. Returns: - GeminiTextGenerator: saved model.""" + GeminiTextGenerator: Saved model.""" new_model = self._bqml_model.copy(model_name, replace) return new_model.session.read_gbq_model(model_name) diff --git a/bigframes/ml/pipeline.py b/bigframes/ml/pipeline.py index 5df2378575..61b5a22da9 100644 --- a/bigframes/ml/pipeline.py +++ b/bigframes/ml/pipeline.py @@ -126,12 +126,12 @@ def to_gbq(self, model_name: str, replace: bool = False) -> Pipeline: Args: model_name (str): - the name of the model(pipeline). + The name of the model(pipeline). replace (bool, default False): - whether to replace if the model(pipeline) already exists. Default to False. + Whether to replace if the model(pipeline) already exists. Default to False. Returns: - Pipeline: saved model(pipeline).""" + Pipeline: Saved model(pipeline).""" if not self._estimator._bqml_model: raise RuntimeError("A model must be fitted before it can be saved") diff --git a/bigframes/ml/remote.py b/bigframes/ml/remote.py index 8cf892f536..8fb6d9db4c 100644 --- a/bigframes/ml/remote.py +++ b/bigframes/ml/remote.py @@ -30,12 +30,12 @@ @log_adapter.class_logger class VertexAIModel(base.BaseEstimator): - """Remote model from a Vertex AI https endpoint. User must specify https endpoint, input schema and output schema. - How to deploy a model in Vertex AI https://cloud.google.com/bigquery/docs/bigquery-ml-remote-model-tutorial#Deploy-Model-on-Vertex-AI. + """Remote model from a Vertex AI HTTPS endpoint. User must specify HTTPS endpoint, input schema and output schema. + For more information, see Deploy model on Vertex AI: https://cloud.google.com/bigquery/docs/bigquery-ml-remote-model-tutorial#Deploy-Model-on-Vertex-AI. Args: endpoint (str): - Vertex AI https endpoint. + Vertex AI HTTPS endpoint. input (Mapping): Input schema: `{column_name: column_type}`. Supported types are "bool", "string", "int64", "float64", "array", "array", "array", "array". output (Mapping): @@ -44,8 +44,8 @@ class VertexAIModel(base.BaseEstimator): BQ session to create the model. If None, use the global default session. connection_name (str or None): Connection to connect with remote service. str of the format ... - if None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach - permission if the connection isn't fully setup. + If None, use default connection in session context. BigQuery DataFrame will try to create the connection and attach + permission if the connection isn't fully set up. """ def __init__( diff --git a/third_party/bigframes_vendored/sklearn/cluster/_kmeans.py b/third_party/bigframes_vendored/sklearn/cluster/_kmeans.py index 2a0acc8cfe..386b620f4a 100644 --- a/third_party/bigframes_vendored/sklearn/cluster/_kmeans.py +++ b/third_party/bigframes_vendored/sklearn/cluster/_kmeans.py @@ -75,7 +75,7 @@ def fit( Not used, present here for API consistency by convention. Returns: - KMeans: Fitted Estimator. + KMeans: Fitted estimator. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py index a55b7b80d3..92794bb68e 100644 --- a/third_party/bigframes_vendored/sklearn/ensemble/_forest.py +++ b/third_party/bigframes_vendored/sklearn/ensemble/_forest.py @@ -56,7 +56,7 @@ def fit(self, X, y): Returns: - ForestModel: Fitted Estimator. + ForestModel: Fitted estimator. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/linear_model/_base.py b/third_party/bigframes_vendored/sklearn/linear_model/_base.py index a845b782c0..69f98697af 100644 --- a/third_party/bigframes_vendored/sklearn/linear_model/_base.py +++ b/third_party/bigframes_vendored/sklearn/linear_model/_base.py @@ -109,6 +109,6 @@ def fit( Target values. Will be cast to X's dtype if necessary. Returns: - LinearRegression: Fitted Estimator. + LinearRegression: Fitted estimator. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py b/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py index 494c730a6d..49198eb9bd 100644 --- a/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py +++ b/third_party/bigframes_vendored/sklearn/linear_model/_logistic.py @@ -79,6 +79,6 @@ def fit( Returns: - LogisticRegression: Fitted Estimator. + LogisticRegression: Fitted estimator. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/metrics/pairwise.py b/third_party/bigframes_vendored/sklearn/metrics/pairwise.py index be3d6753a7..7584230be6 100644 --- a/third_party/bigframes_vendored/sklearn/metrics/pairwise.py +++ b/third_party/bigframes_vendored/sklearn/metrics/pairwise.py @@ -21,7 +21,7 @@ def paired_cosine_distances(X, Y) -> bpd.DataFrame: Input data. X and Y are mapped by indexes, must have the same index. Returns: - bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and cosine_distance + bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and cosine_distance. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -36,7 +36,7 @@ def paired_manhattan_distance(X, Y) -> bpd.DataFrame: Input data. X and Y are mapped by indexes, must have the same index. Returns: - bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and manhattan_distance + bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and manhattan_distance. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) @@ -51,6 +51,6 @@ def paired_euclidean_distances(X, Y) -> bpd.DataFrame: Input data. X and Y are mapped by indexes, must have the same index. Returns: - bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and euclidean_distance + bigframes.dataframe.DataFrame: DataFrame with columns of X, Y and euclidean_distance. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/sklearn/pipeline.py b/third_party/bigframes_vendored/sklearn/pipeline.py index 8a98ee4141..96eaa903be 100644 --- a/third_party/bigframes_vendored/sklearn/pipeline.py +++ b/third_party/bigframes_vendored/sklearn/pipeline.py @@ -25,9 +25,8 @@ class Pipeline(BaseEstimator, metaclass=ABCMeta): The final estimator only needs to implement `fit`. The purpose of the pipeline is to assemble several steps that can be - cross-validated together while setting different parameters. This - simplifies code and allows for deploying an estimator and peprocessing - together, e.g. with `Pipeline.to_gbq(...).` + cross-validated together while setting different parameters. This simplifies code and allows for + deploying an estimator and preprocessing together, e.g. with `Pipeline.to_gbq(...).` """ def fit( diff --git a/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py b/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py index b883e82249..7cdca9229a 100644 --- a/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py +++ b/third_party/bigframes_vendored/sklearn/preprocessing/_encoder.py @@ -84,6 +84,6 @@ def transform(self, X): The DataFrame or Series to be transformed. Returns: - bigframes.dataframe.DataFrame: The result is categorized as index: number, value: number. - Where index is the position of the dict that seeing the category, and value is 0 or 1.""" + bigframes.dataframe.DataFrame: The result is categorized as index: number, value: number, + where index is the position of the dict seeing the category, and value is 0 or 1.""" raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) diff --git a/third_party/bigframes_vendored/xgboost/sklearn.py b/third_party/bigframes_vendored/xgboost/sklearn.py index 5a2a69dff4..da1396af02 100644 --- a/third_party/bigframes_vendored/xgboost/sklearn.py +++ b/third_party/bigframes_vendored/xgboost/sklearn.py @@ -38,7 +38,7 @@ def fit(self, X, y): Target values. Will be cast to X's dtype if necessary. Returns: - XGBModel: Fitted Estimator. + XGBModel: Fitted estimator. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) From a5c94ec90dcf2c541d7d4b9558a629f935649dd2 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Fri, 26 Apr 2024 10:53:34 -0700 Subject: [PATCH 14/18] feat: Add .cache() method to persist intermediate dataframe (#626) --- bigframes/dataframe.py | 11 +++++++++++ bigframes/ml/core.py | 16 +++++----------- bigframes/series.py | 11 +++++++++++ tests/system/small/test_dataframe.py | 2 +- tests/unit/ml/test_golden_sql.py | 4 ++-- 5 files changed, 30 insertions(+), 14 deletions(-) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 48c4af7a37..092c8ab82f 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3397,6 +3397,17 @@ def _set_block(self, block: blocks.Block): def _get_block(self) -> blocks.Block: return self._block + def cache(self): + """ + Materializes the DataFrame to a temporary table. + + Useful if the dataframe will be used multiple times, as this will avoid recomputating the shared intermediate value. + + Returns: + DataFrame: Self + """ + return self._cached(force=True) + def _cached(self, *, force: bool = False) -> DataFrame: """Materialize dataframe to a temporary table. No-op if the dataframe represents a trivial transformation of an existing materialization. diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index 12c881c19a..7b4638157e 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -83,7 +83,7 @@ def distance( """ assert len(x.columns) == 1 and len(y.columns) == 1 - input_data = x._cached().join(y._cached(), how="outer") + input_data = x.cache().join(y.cache(), how="outer") x_column_id, y_column_id = x._block.value_columns[0], y._block.value_columns[0] return self._apply_sql( @@ -310,11 +310,9 @@ def create_model( # Cache dataframes to make sure base table is not a snapshot # cached dataframe creates a full copy, never uses snapshot if y_train is None: - input_data = X_train._cached(force=True) + input_data = X_train.cache() else: - input_data = X_train._cached(force=True).join( - y_train._cached(force=True), how="outer" - ) + input_data = X_train.cache().join(y_train.cache(), how="outer") options.update({"INPUT_LABEL_COLS": y_train.columns.tolist()}) session = X_train._session @@ -354,9 +352,7 @@ def create_llm_remote_model( options = dict(options) # Cache dataframes to make sure base table is not a snapshot # cached dataframe creates a full copy, never uses snapshot - input_data = X_train._cached(force=True).join( - y_train._cached(force=True), how="outer" - ) + input_data = X_train.cache().join(y_train.cache(), how="outer") options.update({"INPUT_LABEL_COLS": y_train.columns.tolist()}) session = X_train._session @@ -389,9 +385,7 @@ def create_time_series_model( options = dict(options) # Cache dataframes to make sure base table is not a snapshot # cached dataframe creates a full copy, never uses snapshot - input_data = X_train._cached(force=True).join( - y_train._cached(force=True), how="outer" - ) + input_data = X_train.cache().join(y_train.cache(), how="outer") options.update({"TIME_SERIES_TIMESTAMP_COL": X_train.columns.tolist()[0]}) options.update({"TIME_SERIES_DATA_COL": y_train.columns.tolist()[0]}) diff --git a/bigframes/series.py b/bigframes/series.py index 5184d4bf1d..3986d38445 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1682,6 +1682,17 @@ def _slice( ), ) + def cache(self): + """ + Materializes the Series to a temporary table. + + Useful if the series will be used multiple times, as this will avoid recomputating the shared intermediate value. + + Returns: + Series: Self + """ + return self._cached(force=True) + def _cached(self, *, force: bool = True) -> Series: self._set_block(self._block.cached(force=force)) return self diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 2a4b53403d..b428207314 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4204,7 +4204,7 @@ def test_df_cached(scalars_df_index): ) df = df[df["rowindex_2"] % 2 == 0] - df_cached_copy = df._cached() + df_cached_copy = df.cache() pandas.testing.assert_frame_equal(df.to_pandas(), df_cached_copy.to_pandas()) diff --git a/tests/unit/ml/test_golden_sql.py b/tests/unit/ml/test_golden_sql.py index bcb220b107..48fb7011ea 100644 --- a/tests/unit/ml/test_golden_sql.py +++ b/tests/unit/ml/test_golden_sql.py @@ -63,7 +63,7 @@ def bqml_model_factory(mocker: pytest_mock.MockerFixture): def mock_y(): mock_y = mock.create_autospec(spec=bpd.DataFrame) mock_y.columns = pd.Index(["input_column_label"]) - mock_y._cached.return_value = mock_y + mock_y.cache.return_value = mock_y return mock_y @@ -83,7 +83,7 @@ def mock_X(mock_y, mock_session): ["index_column_id"], ["index_column_label"], ) - mock_X._cached.return_value = mock_X + mock_X.cache.return_value = mock_X return mock_X From 3aa643f7ab6dd0ff826ca2aafbeef29035d7c912 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Mon, 29 Apr 2024 17:53:04 +0000 Subject: [PATCH 15/18] feat: allow single input type in `remote_function` (#641) * feat: allow single input type in `remote_function` * say sequence instead of list in the remote_function docstring * fix more doc --- bigframes/functions/remote_function.py | 14 +++++---- bigframes/pandas/__init__.py | 2 +- bigframes/session/__init__.py | 7 +++-- samples/snippets/remote_function.py | 4 +-- tests/system/large/test_remote_function.py | 29 +++++++++++++++++++ .../bigframes_vendored/pandas/core/frame.py | 2 +- .../bigframes_vendored/pandas/core/series.py | 8 ++--- 7 files changed, 50 insertions(+), 16 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 4bb667ccc7..f7237c564c 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -24,7 +24,7 @@ import sys import tempfile import textwrap -from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING +from typing import List, NamedTuple, Optional, Sequence, TYPE_CHECKING, Union import ibis import requests @@ -623,7 +623,7 @@ def get_routine_reference( # which has moved as @js to the ibis package # https://github.com/ibis-project/ibis/blob/master/ibis/backends/bigquery/udf/__init__.py def remote_function( - input_types: Sequence[type], + input_types: Union[type, Sequence[type]], output_type: type, session: Optional[Session] = None, bigquery_client: Optional[bigquery.Client] = None, @@ -686,9 +686,10 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types list(type): - List of input data types in the user defined function. - output_type type: + input_types (type or sequence(type)): + Input data type, or sequence of input data types in the user + defined function. + output_type (type): Data type of the output in the user defined function. session (bigframes.Session, Optional): BigQuery DataFrames session to use for getting default project, @@ -778,6 +779,9 @@ def remote_function( By default BigQuery DataFrames uses a 10 minute timeout. `None` can be passed to let the cloud functions default timeout take effect. """ + if isinstance(input_types, type): + input_types = [input_types] + import bigframes.pandas as bpd session = session or bpd.get_global_session() diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 71ef4e609e..48a4b0f68d 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -633,7 +633,7 @@ def read_parquet( def remote_function( - input_types: List[type], + input_types: Union[type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 34047ff155..79febcc5d9 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1538,7 +1538,7 @@ def _ibis_to_temp_table( def remote_function( self, - input_types: List[type], + input_types: Union[type, Sequence[type]], output_type: type, dataset: Optional[str] = None, bigquery_connection: Optional[str] = None, @@ -1592,8 +1592,9 @@ def remote_function( `$ gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:CONNECTION_SERVICE_ACCOUNT_ID" --role="roles/run.invoker"`. Args: - input_types (list(type)): - List of input data types in the user defined function. + input_types (type or sequence(type)): + Input data type, or sequence of input data types in the user + defined function. output_type (type): Data type of the output in the user defined function. dataset (str, Optional): diff --git a/samples/snippets/remote_function.py b/samples/snippets/remote_function.py index 61b7dc092a..4db4e67619 100644 --- a/samples/snippets/remote_function.py +++ b/samples/snippets/remote_function.py @@ -47,7 +47,7 @@ def run_remote_function_and_read_gbq_function(project_id: str): # of the penguins, which is a real number, into a category, which is a # string. @bpd.remote_function( - [float], + float, str, reuse=False, ) @@ -91,7 +91,7 @@ def get_bucket(num): # as a remote function. The custom function in this example has external # package dependency, which can be specified via `packages` parameter. @bpd.remote_function( - [str], + str, str, reuse=False, packages=["cryptography"], diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 6cae893f9c..eb7cb8308b 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -310,6 +310,35 @@ def add_one(x): ) +@pytest.mark.parametrize( + ("input_types"), + [ + pytest.param([int], id="list-of-int"), + pytest.param(int, id="int"), + ], +) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_input_types(session, scalars_dfs, input_types): + try: + + def add_one(x): + return x + 1 + + remote_add_one = session.remote_function(input_types, int)(add_one) + + scalars_df, scalars_pandas_df = scalars_dfs + + bf_result = scalars_df.int64_too.map(remote_add_one).to_pandas() + pd_result = scalars_pandas_df.int64_too.map(add_one) + + pandas.testing.assert_series_equal(bf_result, pd_result, check_dtype=False) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, remote_add_one + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_explicit_dataset_not_created( session, diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 1669a291c9..c5168cd160 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -3892,7 +3892,7 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: to potentially reuse a previously deployed ``remote_function`` from the same user defined function. - >>> @bpd.remote_function([int], float, reuse=False) + >>> @bpd.remote_function(int, float, reuse=False) ... def minutes_to_hours(x): ... return x/60 diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 0c5b8d4521..4833c41ff7 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -1181,7 +1181,7 @@ def apply( to potentially reuse a previously deployed `remote_function` from the same user defined function. - >>> @bpd.remote_function([int], float, reuse=False) + >>> @bpd.remote_function(int, float, reuse=False) ... def minutes_to_hours(x): ... return x/60 @@ -1208,7 +1208,7 @@ def apply( `packages` param. >>> @bpd.remote_function( - ... [str], + ... str, ... str, ... reuse=False, ... packages=["cryptography"], @@ -3341,7 +3341,7 @@ def mask(self, cond, other): condition is evaluated based on a complicated business logic which cannot be expressed in form of a Series. - >>> @bpd.remote_function([str], bool, reuse=False) + >>> @bpd.remote_function(str, bool, reuse=False) ... def should_mask(name): ... hash = 0 ... for char_ in name: @@ -3860,7 +3860,7 @@ def map( It also accepts a remote function: - >>> @bpd.remote_function([str], str) + >>> @bpd.remote_function(str, str) ... def my_mapper(val): ... vowels = ["a", "e", "i", "o", "u"] ... if val: From 03c1b0d8122afe9e56b480100d6207d1228ca576 Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Mon, 29 Apr 2024 11:04:26 -0700 Subject: [PATCH 16/18] perf: Automatically condense internal expression representation (#516) --- bigframes/core/__init__.py | 14 +++++---- bigframes/core/compile/compiled.py | 26 +++++++++++----- bigframes/core/rewrite.py | 48 +++++++++++++++++++++++------- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 04291edbb1..185ce7cd4f 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -183,7 +183,7 @@ def project_to_id(self, expression: ex.Expression, output_id: str): child=self.node, assignments=tuple(exprs), ) - ) + ).merge_projections() def assign(self, source_id: str, destination_id: str) -> ArrayValue: if destination_id in self.column_ids: # Mutate case @@ -208,7 +208,7 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue: child=self.node, assignments=tuple(exprs), ) - ) + ).merge_projections() def assign_constant( self, @@ -242,7 +242,7 @@ def assign_constant( child=self.node, assignments=tuple(exprs), ) - ) + ).merge_projections() def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: selections = ((ex.free_var(col_id), col_id) for col_id in column_ids) @@ -251,7 +251,7 @@ def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: child=self.node, assignments=tuple(selections), ) - ) + ).merge_projections() def drop_columns(self, columns: Iterable[str]) -> ArrayValue: new_projection = ( @@ -264,7 +264,7 @@ def drop_columns(self, columns: Iterable[str]) -> ArrayValue: child=self.node, assignments=tuple(new_projection), ) - ) + ).merge_projections() def aggregate( self, @@ -466,3 +466,7 @@ def _uniform_sampling(self, fraction: float) -> ArrayValue: The row numbers of result is non-deterministic, avoid to use. """ return ArrayValue(nodes.RandomSampleNode(self.node, fraction)) + + def merge_projections(self) -> ArrayValue: + new_node = bigframes.core.rewrite.maybe_squash_projection(self.node) + return ArrayValue(new_node) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 88c1006c79..d14a5d3241 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -1050,8 +1050,8 @@ def _hide_column(self, column_id) -> OrderedIR: def _bake_ordering(self) -> OrderedIR: """Bakes ordering expression into the selection, maybe creating hidden columns.""" ordering_expressions = self._ordering.all_ordering_columns - new_exprs = [] - new_baked_cols = [] + new_exprs: list[OrderingExpression] = [] + new_baked_cols: list[ibis_types.Value] = [] for expr in ordering_expressions: if isinstance(expr.scalar_expression, ex.OpExpression): baked_column = self._compile_expression(expr.scalar_expression).name( @@ -1059,18 +1059,28 @@ def _bake_ordering(self) -> OrderedIR: ) new_baked_cols.append(baked_column) new_expr = OrderingExpression( - ex.free_var(baked_column.name), expr.direction, expr.na_last + ex.free_var(baked_column.get_name()), expr.direction, expr.na_last ) new_exprs.append(new_expr) - else: + elif isinstance(expr.scalar_expression, ex.UnboundVariableExpression): + order_col = expr.scalar_expression.id new_exprs.append(expr) - - ordering = self._ordering.with_ordering_columns(new_exprs) + if order_col not in self.column_ids: + new_baked_cols.append( + self._ibis_bindings[expr.scalar_expression.id] + ) + + new_ordering = ExpressionOrdering( + tuple(new_exprs), + self._ordering.integer_encoding, + self._ordering.string_encoding, + self._ordering.total_ordering_columns, + ) return OrderedIR( self._table, columns=self.columns, - hidden_ordering_columns=[*self._hidden_ordering_columns, *new_baked_cols], - ordering=ordering, + hidden_ordering_columns=tuple(new_baked_cols), + ordering=new_ordering, predicates=self._predicates, ) diff --git a/bigframes/core/rewrite.py b/bigframes/core/rewrite.py index 61fe28b7b5..e3a07c04b4 100644 --- a/bigframes/core/rewrite.py +++ b/bigframes/core/rewrite.py @@ -35,16 +35,21 @@ class SquashedSelect: columns: Tuple[Tuple[scalar_exprs.Expression, str], ...] predicate: Optional[scalar_exprs.Expression] ordering: Tuple[order.OrderingExpression, ...] + reverse_root: bool = False @classmethod - def from_node(cls, node: nodes.BigFrameNode) -> SquashedSelect: + def from_node( + cls, node: nodes.BigFrameNode, projections_only: bool = False + ) -> SquashedSelect: if isinstance(node, nodes.ProjectionNode): - return cls.from_node(node.child).project(node.assignments) - elif isinstance(node, nodes.FilterNode): + return cls.from_node(node.child, projections_only=projections_only).project( + node.assignments + ) + elif not projections_only and isinstance(node, nodes.FilterNode): return cls.from_node(node.child).filter(node.predicate) - elif isinstance(node, nodes.ReversedNode): + elif not projections_only and isinstance(node, nodes.ReversedNode): return cls.from_node(node.child).reverse() - elif isinstance(node, nodes.OrderByNode): + elif not projections_only and isinstance(node, nodes.OrderByNode): return cls.from_node(node.child).order_with(node.by) else: selection = tuple( @@ -63,7 +68,9 @@ def project( new_columns = tuple( (expr.bind_all_variables(self.column_lookup), id) for expr, id in projection ) - return SquashedSelect(self.root, new_columns, self.predicate, self.ordering) + return SquashedSelect( + self.root, new_columns, self.predicate, self.ordering, self.reverse_root + ) def filter(self, predicate: scalar_exprs.Expression) -> SquashedSelect: if self.predicate is None: @@ -72,18 +79,24 @@ def filter(self, predicate: scalar_exprs.Expression) -> SquashedSelect: new_predicate = ops.and_op.as_expr( self.predicate, predicate.bind_all_variables(self.column_lookup) ) - return SquashedSelect(self.root, self.columns, new_predicate, self.ordering) + return SquashedSelect( + self.root, self.columns, new_predicate, self.ordering, self.reverse_root + ) def reverse(self) -> SquashedSelect: new_ordering = tuple(expr.with_reverse() for expr in self.ordering) - return SquashedSelect(self.root, self.columns, self.predicate, new_ordering) + return SquashedSelect( + self.root, self.columns, self.predicate, new_ordering, not self.reverse_root + ) def order_with(self, by: Tuple[order.OrderingExpression, ...]): adjusted_orderings = [ order_part.bind_variables(self.column_lookup) for order_part in by ] new_ordering = (*adjusted_orderings, *self.ordering) - return SquashedSelect(self.root, self.columns, self.predicate, new_ordering) + return SquashedSelect( + self.root, self.columns, self.predicate, new_ordering, self.reverse_root + ) def maybe_join( self, right: SquashedSelect, join_def: join_defs.JoinDefinition @@ -126,8 +139,10 @@ def maybe_join( new_columns = remap_names(join_def, lselection, rselection) # Reconstruct ordering + reverse_root = self.reverse_root if join_type == "right": new_ordering = right.ordering + reverse_root = right.reverse_root elif join_type == "outer": if lmask is not None: prefix = order.OrderingExpression(lmask, order.OrderingDirection.DESC) @@ -158,11 +173,15 @@ def maybe_join( new_ordering = self.ordering else: raise ValueError(f"Unexpected join type {join_type}") - return SquashedSelect(self.root, new_columns, new_predicate, new_ordering) + return SquashedSelect( + self.root, new_columns, new_predicate, new_ordering, reverse_root + ) def expand(self) -> nodes.BigFrameNode: # Safest to apply predicates first, as it may filter out inputs that cannot be handled by other expressions root = self.root + if self.reverse_root: + root = nodes.ReversedNode(child=root) if self.predicate: root = nodes.FilterNode(child=root, predicate=self.predicate) if self.ordering: @@ -170,6 +189,15 @@ def expand(self) -> nodes.BigFrameNode: return nodes.ProjectionNode(child=root, assignments=self.columns) +def maybe_squash_projection(node: nodes.BigFrameNode) -> nodes.BigFrameNode: + if isinstance(node, nodes.ProjectionNode) and isinstance( + node.child, nodes.ProjectionNode + ): + # Conservative approach, only squash consecutive projections, even though could also squash filters, reorderings + return SquashedSelect.from_node(node, projections_only=True).expand() + return node + + def maybe_rewrite_join(join_node: nodes.JoinNode) -> nodes.BigFrameNode: left_side = SquashedSelect.from_node(join_node.left_child) right_side = SquashedSelect.from_node(join_node.right_child) From cf4ec3af96c28d42e76868c6230a38511052c44e Mon Sep 17 00:00:00 2001 From: Ashley Xu <139821907+ashleyxuu@users.noreply.github.com> Date: Mon, 29 Apr 2024 13:16:16 -0700 Subject: [PATCH 17/18] fix: llm palm score tests (#643) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes internal # 336527025🦕 --- tests/system/load/test_llm.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/system/load/test_llm.py b/tests/system/load/test_llm.py index 835b31955e..fd13662275 100644 --- a/tests/system/load/test_llm.py +++ b/tests/system/load/test_llm.py @@ -49,12 +49,13 @@ def llm_remote_text_df(session, llm_remote_text_pandas_df): return session.read_pandas(llm_remote_text_pandas_df) +@pytest.mark.flaky(retries=2) def test_llm_palm_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_df): model = bigframes.ml.llm.PaLM2TextGenerator( model_name="text-bison", max_iterations=1 ) - df = llm_fine_tune_df_default_index.dropna() + df = llm_fine_tune_df_default_index.dropna().sample(n=100) X_train = df[["prompt"]] y_train = df[["label"]] model.fit(X_train, y_train) @@ -70,6 +71,7 @@ def test_llm_palm_configure_fit(llm_fine_tune_df_default_index, llm_remote_text_ # TODO(ashleyxu b/335492787): After bqml rolled out version control: save, load, check parameters to ensure configuration was kept +@pytest.mark.flaky(retries=2) def test_llm_palm_score(llm_fine_tune_df_default_index): model = bigframes.ml.llm.PaLM2TextGenerator(model_name="text-bison") @@ -89,6 +91,7 @@ def test_llm_palm_score(llm_fine_tune_df_default_index): assert all(col in score_result_col for col in expected_col) +@pytest.mark.flaky(retries=2) def test_llm_palm_score_params(llm_fine_tune_df_default_index): model = bigframes.ml.llm.PaLM2TextGenerator( model_name="text-bison", max_iterations=1 @@ -102,12 +105,10 @@ def test_llm_palm_score_params(llm_fine_tune_df_default_index): ).to_pandas() score_result_col = score_result.columns.to_list() expected_col = [ - "trial_id", "precision", "recall", - "accuracy", "f1_score", - "log_loss", - "roc_auc", + "label", + "evaluation_status", ] assert all(col in score_result_col for col in expected_col) From ac8f40c6df80c906079986042875cd6b57ab576e Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Mon, 29 Apr 2024 17:11:25 -0700 Subject: [PATCH 18/18] chore(main): release 1.4.0 (#633) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 33 +++++++++++++++++++++++++++++++++ bigframes/version.py | 2 +- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a96c902835..b01e78ec42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,39 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.4.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.3.0...v1.4.0) (2024-04-29) + + +### Features + +* Add .cache() method to persist intermediate dataframe ([#626](https://github.com/googleapis/python-bigquery-dataframes/issues/626)) ([a5c94ec](https://github.com/googleapis/python-bigquery-dataframes/commit/a5c94ec90dcf2c541d7d4b9558a629f935649dd2)) +* Add transpose support for small homogeneously typed DataFrames. ([#621](https://github.com/googleapis/python-bigquery-dataframes/issues/621)) ([054075d](https://github.com/googleapis/python-bigquery-dataframes/commit/054075d448f7de1b3bc1a4631b4e2340643de4ef)) +* Allow single input type in `remote_function` ([#641](https://github.com/googleapis/python-bigquery-dataframes/issues/641)) ([3aa643f](https://github.com/googleapis/python-bigquery-dataframes/commit/3aa643f7ab6dd0ff826ca2aafbeef29035d7c912)) +* Expose gcf max timeout in `remote_function` ([#639](https://github.com/googleapis/python-bigquery-dataframes/issues/639)) ([dfeaad0](https://github.com/googleapis/python-bigquery-dataframes/commit/dfeaad0ae3b3557a9e8ccb21ddbdc55cfd611e0f)) +* Series binary ops compatible with more types ([#618](https://github.com/googleapis/python-bigquery-dataframes/issues/618)) ([518d315](https://github.com/googleapis/python-bigquery-dataframes/commit/518d315487f351c227070c0127382d11381c5e88)) +* Support the `score` method for `PaLM2TextGenerator` ([#634](https://github.com/googleapis/python-bigquery-dataframes/issues/634)) ([3ffc1d2](https://github.com/googleapis/python-bigquery-dataframes/commit/3ffc1d275ae110bffea2f08e63ef75b053764a0c)) + + +### Bug Fixes + +* Allow to_pandas to download more than 10GB ([#637](https://github.com/googleapis/python-bigquery-dataframes/issues/637)) ([ce56495](https://github.com/googleapis/python-bigquery-dataframes/commit/ce5649513b66c5191a56fc1fd29240b5dbe02394)) +* Extend row hash to 128 bits to guarantee unique row id ([#632](https://github.com/googleapis/python-bigquery-dataframes/issues/632)) ([9005c6e](https://github.com/googleapis/python-bigquery-dataframes/commit/9005c6e79297d7130e93a0e632eb3936aa145efe)) +* Llm fine tuning tests ([#627](https://github.com/googleapis/python-bigquery-dataframes/issues/627)) ([4724a1a](https://github.com/googleapis/python-bigquery-dataframes/commit/4724a1a456076d003613d2e964a8dd2d80a09ad9)) +* Llm palm score tests ([#643](https://github.com/googleapis/python-bigquery-dataframes/issues/643)) ([cf4ec3a](https://github.com/googleapis/python-bigquery-dataframes/commit/cf4ec3af96c28d42e76868c6230a38511052c44e)) + + +### Performance Improvements + +* Automatically condense internal expression representation ([#516](https://github.com/googleapis/python-bigquery-dataframes/issues/516)) ([03c1b0d](https://github.com/googleapis/python-bigquery-dataframes/commit/03c1b0d8122afe9e56b480100d6207d1228ca576)) +* Cache transpose to allow performant retranspose ([#635](https://github.com/googleapis/python-bigquery-dataframes/issues/635)) ([44b738d](https://github.com/googleapis/python-bigquery-dataframes/commit/44b738df07d0ee9d9ae2ced339a123f31139f887)) + + +### Documentation + +* Add supported pandas apis on the main page ([#628](https://github.com/googleapis/python-bigquery-dataframes/issues/628)) ([8d2a51c](https://github.com/googleapis/python-bigquery-dataframes/commit/8d2a51c4079844daba20f414b6c0c0ca030ba1f9)) +* Add the first sample for the Single time-series forecasting from Google Analytics data tutorial ([#623](https://github.com/googleapis/python-bigquery-dataframes/issues/623)) ([2b84c4f](https://github.com/googleapis/python-bigquery-dataframes/commit/2b84c4f173e956ba2c7fcc0ad92785ae95161d8e)) +* Address more technical writers' feedback ([#640](https://github.com/googleapis/python-bigquery-dataframes/issues/640)) ([1e7793c](https://github.com/googleapis/python-bigquery-dataframes/commit/1e7793cdcb56b8c0bcccc1c1ab356bac44454592)) + ## [1.3.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.2.0...v1.3.0) (2024-04-22) diff --git a/bigframes/version.py b/bigframes/version.py index 1f103401e4..e892a8893f 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.3.0" +__version__ = "1.4.0"