diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 1a2544704c..283f56fd39 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1232,46 +1232,10 @@ def aggregate_all_and_stack( index_labels=[None], ).transpose(original_row_index=pd.Index([None]), single_row_mode=True) else: # axis_n == 1 - # using offsets as identity to group on. - # TODO: Allow to promote identity/total_order columns instead for better perf - expr_with_offsets, offset_col = self.expr.promote_offsets() - stacked_expr, (_, value_col_ids, passthrough_cols,) = unpivot( - expr_with_offsets, - row_labels=self.column_labels, - unpivot_columns=[tuple(self.value_columns)], - passthrough_columns=[*self.index_columns, offset_col], - ) - # these corresponed to passthrough_columns provided to unpivot - index_cols = passthrough_cols[:-1] - og_offset_col = passthrough_cols[-1] - index_aggregations = [ - ( - ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.deref(col_id)), - col_id, - ) - for col_id in index_cols - ] - # TODO: may need add NullaryAggregation in main_aggregation - # when agg add support for axis=1, needed for agg("size", axis=1) - assert isinstance( - operation, agg_ops.UnaryAggregateOp - ), f"Expected a unary operation, but got {operation}. Please report this error and how you got here to the BigQuery DataFrames team (bit.ly/bigframes-feedback)." - main_aggregation = ( - ex.UnaryAggregation(operation, ex.deref(value_col_ids[0])), - value_col_ids[0], - ) - # Drop row identity after aggregating over it - result_expr = stacked_expr.aggregate( - [*index_aggregations, main_aggregation], - by_column_ids=[og_offset_col], - dropna=dropna, - ).drop_columns([og_offset_col]) - return Block( - result_expr, - index_columns=index_cols, - column_labels=[None], - index_labels=self.index.names, - ) + as_array = ops.ToArrayOp().as_expr(*(col for col in self.value_columns)) + reduced = ops.ArrayReduceOp(operation).as_expr(as_array) + block, id = self.project_expr(reduced, None) + return block.select_column(id) def aggregate_size( self, diff --git a/bigframes/core/compile/ibis_compiler/aggregate_compiler.py b/bigframes/core/compile/ibis_compiler/aggregate_compiler.py index 4e0bf477fc..291db44524 100644 --- a/bigframes/core/compile/ibis_compiler/aggregate_compiler.py +++ b/bigframes/core/compile/ibis_compiler/aggregate_compiler.py @@ -165,7 +165,7 @@ def _( ) -> ibis_types.NumericValue: # Will be null if all inputs are null. Pandas defaults to zero sum though. bq_sum = _apply_window_if_present(column.sum(), window) - return bq_sum.fill_null(ibis_types.literal(0)) + return bq_sum.coalesce(ibis_types.literal(0)) @compile_unary_agg.register diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index f3653efc56..969ae2659d 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1201,6 +1201,28 @@ def array_slice_op_impl(x: ibis_types.Value, op: ops.ArraySliceOp): return res +@scalar_op_compiler.register_nary_op(ops.ToArrayOp, pass_op=False) +def to_arry_op_impl(*values: ibis_types.Value): + do_upcast_bool = any(t.type().is_numeric() for t in values) + if do_upcast_bool: + values = tuple( + val.cast(ibis_dtypes.int64) if val.type().is_boolean() else val + for val in values + ) + return ibis_api.array(values) + + +@scalar_op_compiler.register_unary_op(ops.ArrayReduceOp, pass_op=True) +def array_reduce_op_impl(x: ibis_types.Value, op: ops.ArrayReduceOp): + import bigframes.core.compile.ibis_compiler.aggregate_compiler as agg_compilers + + return typing.cast(ibis_types.ArrayValue, x).reduce( + lambda arr_vals: agg_compilers.compile_unary_agg( + op.aggregation, typing.cast(ibis_types.Column, arr_vals) + ) + ) + + # JSON Ops @scalar_op_compiler.register_binary_op(ops.JSONSet, pass_op=True) def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): diff --git a/bigframes/core/compile/polars/compiler.py b/bigframes/core/compile/polars/compiler.py index 1bfbe0f734..3316154de7 100644 --- a/bigframes/core/compile/polars/compiler.py +++ b/bigframes/core/compile/polars/compiler.py @@ -31,6 +31,7 @@ import bigframes.dtypes import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops +import bigframes.operations.array_ops as arr_ops import bigframes.operations.bool_ops as bool_ops import bigframes.operations.comparison_ops as comp_ops import bigframes.operations.datetime_ops as dt_ops @@ -353,6 +354,36 @@ def _(self, op: ops.ScalarOp, input: pl.Expr) -> pl.Expr: assert isinstance(op, json_ops.JSONDecode) return input.str.json_decode(_DTYPE_MAPPING[op.to_type]) + @compile_op.register(arr_ops.ToArrayOp) + def _(self, op: ops.ToArrayOp, *inputs: pl.Expr) -> pl.Expr: + return pl.concat_list(*inputs) + + @compile_op.register(arr_ops.ArrayReduceOp) + def _(self, op: ops.ArrayReduceOp, input: pl.Expr) -> pl.Expr: + # TODO: Unify this with general aggregation compilation? + if isinstance(op.aggregation, agg_ops.MinOp): + return input.list.min() + if isinstance(op.aggregation, agg_ops.MaxOp): + return input.list.max() + if isinstance(op.aggregation, agg_ops.SumOp): + return input.list.sum() + if isinstance(op.aggregation, agg_ops.MeanOp): + return input.list.mean() + if isinstance(op.aggregation, agg_ops.CountOp): + return input.list.len() + if isinstance(op.aggregation, agg_ops.StdOp): + return input.list.std() + if isinstance(op.aggregation, agg_ops.VarOp): + return input.list.var() + if isinstance(op.aggregation, agg_ops.AnyOp): + return input.list.any() + if isinstance(op.aggregation, agg_ops.AllOp): + return input.list.all() + else: + raise NotImplementedError( + f"Haven't implemented array aggregation: {op.aggregation}" + ) + @dataclasses.dataclass(frozen=True) class PolarsAggregateCompiler: scalar_compiler = PolarsExpressionCompiler() diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index e10a972790..e5888ace00 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -14,7 +14,13 @@ from __future__ import annotations -from bigframes.operations.array_ops import ArrayIndexOp, ArraySliceOp, ArrayToStringOp +from bigframes.operations.array_ops import ( + ArrayIndexOp, + ArrayReduceOp, + ArraySliceOp, + ArrayToStringOp, + ToArrayOp, +) from bigframes.operations.base_ops import ( BinaryOp, NaryOp, @@ -405,4 +411,6 @@ # Numpy ops mapping "NUMPY_TO_BINOP", "NUMPY_TO_OP", + "ToArrayOp", + "ArrayReduceOp", ] diff --git a/bigframes/operations/array_ops.py b/bigframes/operations/array_ops.py index c1e644fc11..61ada59cc7 100644 --- a/bigframes/operations/array_ops.py +++ b/bigframes/operations/array_ops.py @@ -13,10 +13,11 @@ # limitations under the License. import dataclasses +import functools import typing from bigframes import dtypes -from bigframes.operations import base_ops +from bigframes.operations import aggregations, base_ops @dataclasses.dataclass(frozen=True) @@ -63,3 +64,27 @@ def output_type(self, *input_types): return input_type else: raise TypeError("Input type must be an array or string-like type.") + + +class ToArrayOp(base_ops.NaryOp): + name: typing.ClassVar[str] = "array" + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + # very permissive, maybe should force caller to do this? + common_type = functools.reduce( + lambda t1, t2: dtypes.coerce_to_common(t1, t2), + input_types, + ) + return dtypes.list_type(common_type) + + +@dataclasses.dataclass(frozen=True) +class ArrayReduceOp(base_ops.UnaryOp): + name: typing.ClassVar[str] = "array_reduce" + aggregation: aggregations.AggregateOp + + def output_type(self, *input_types): + input_type = input_types[0] + assert dtypes.is_array_like(input_type) + inner_type = dtypes.get_array_inner_type(input_type) + return self.aggregation.output_type(inner_type) diff --git a/tests/system/small/engines/conftest.py b/tests/system/small/engines/conftest.py index 4f0f875b34..9699cc6a61 100644 --- a/tests/system/small/engines/conftest.py +++ b/tests/system/small/engines/conftest.py @@ -90,3 +90,10 @@ def repeated_data_source( repeated_pandas_df: pd.DataFrame, ) -> local_data.ManagedArrowTable: return local_data.ManagedArrowTable.from_pandas(repeated_pandas_df) + + +@pytest.fixture(scope="module") +def arrays_array_value( + repeated_data_source: local_data.ManagedArrowTable, fake_session: bigframes.Session +): + return ArrayValue.from_managed(repeated_data_source, fake_session) diff --git a/tests/system/small/engines/test_array_ops.py b/tests/system/small/engines/test_array_ops.py new file mode 100644 index 0000000000..c53b9e9dc1 --- /dev/null +++ b/tests/system/small/engines/test_array_ops.py @@ -0,0 +1,60 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from bigframes.core import array_value, expression +import bigframes.operations as ops +import bigframes.operations.aggregations as agg_ops +from bigframes.session import polars_executor +from bigframes.testing.engine_utils import assert_equivalence_execution + +pytest.importorskip("polars") + +# Polars used as reference as its fast and local. Generally though, prefer gbq engine where they disagree. +REFERENCE_ENGINE = polars_executor.PolarsExecutor() + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_to_array_op(scalars_array_value: array_value.ArrayValue, engine): + # Bigquery won't allow you to materialize arrays with null, so use non-nullable + int64_non_null = ops.coalesce_op.as_expr("int64_col", expression.const(0)) + bool_col_non_null = ops.coalesce_op.as_expr("bool_col", expression.const(False)) + float_col_non_null = ops.coalesce_op.as_expr("float64_col", expression.const(0.0)) + string_col_non_null = ops.coalesce_op.as_expr("string_col", expression.const("")) + + arr, _ = scalars_array_value.compute_values( + [ + ops.ToArrayOp().as_expr(int64_non_null), + ops.ToArrayOp().as_expr( + int64_non_null, bool_col_non_null, float_col_non_null + ), + ops.ToArrayOp().as_expr(string_col_non_null, string_col_non_null), + ] + ) + assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine) + + +@pytest.mark.parametrize("engine", ["polars", "bq"], indirect=True) +def test_engines_array_reduce_op(arrays_array_value: array_value.ArrayValue, engine): + arr, _ = arrays_array_value.compute_values( + [ + ops.ArrayReduceOp(agg_ops.SumOp()).as_expr("float_list_col"), + ops.ArrayReduceOp(agg_ops.StdOp()).as_expr("float_list_col"), + ops.ArrayReduceOp(agg_ops.MaxOp()).as_expr("date_list_col"), + ops.ArrayReduceOp(agg_ops.CountOp()).as_expr("string_list_col"), + ops.ArrayReduceOp(agg_ops.AnyOp()).as_expr("bool_list_col"), + ] + ) + assert_equivalence_execution(arr.node, REFERENCE_ENGINE, engine) diff --git a/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py b/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py index 08bf0d7650..61bafeeca2 100644 --- a/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py +++ b/third_party/bigframes_vendored/ibis/backends/sql/compilers/bigquery/__init__.py @@ -699,6 +699,9 @@ def visit_ArrayFilter(self, op, *, arg, body, param): def visit_ArrayMap(self, op, *, arg, body, param): return self.f.array(sg.select(body).from_(self._unnest(arg, as_=param))) + def visit_ArrayReduce(self, op, *, arg, body, param): + return sg.select(body).from_(self._unnest(arg, as_=param)).subquery() + def visit_ArrayZip(self, op, *, arg): lengths = [self.f.array_length(arr) - 1 for arr in arg] idx = sg.to_identifier(util.gen_name("bq_arr_idx")) diff --git a/third_party/bigframes_vendored/ibis/expr/operations/arrays.py b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py index 638b24a212..8134506255 100644 --- a/third_party/bigframes_vendored/ibis/expr/operations/arrays.py +++ b/third_party/bigframes_vendored/ibis/expr/operations/arrays.py @@ -105,6 +105,21 @@ def dtype(self) -> dt.DataType: return dt.Array(self.body.dtype) +@public +class ArrayReduce(Value): + """Apply a function to every element of an array.""" + + arg: Value[dt.Array] + body: Value + param: str + + shape = rlz.shape_like("arg") + + @attribute + def dtype(self) -> dt.DataType: + return self.body.dtype + + @public class ArrayFilter(Value): """Filter array elements with a function.""" diff --git a/third_party/bigframes_vendored/ibis/expr/rewrites.py b/third_party/bigframes_vendored/ibis/expr/rewrites.py index a85498b30b..b0569846da 100644 --- a/third_party/bigframes_vendored/ibis/expr/rewrites.py +++ b/third_party/bigframes_vendored/ibis/expr/rewrites.py @@ -252,7 +252,7 @@ def rewrite_project_input(value, relation): # relation return value.replace( project_wrap_analytic | project_wrap_reduction, - filter=p.Value & ~p.WindowFunction, + filter=p.Value & ~p.WindowFunction & ~p.ArrayReduce, context={"rel": relation}, ) diff --git a/third_party/bigframes_vendored/ibis/expr/types/arrays.py b/third_party/bigframes_vendored/ibis/expr/types/arrays.py index a8f64490c1..72f01334c1 100644 --- a/third_party/bigframes_vendored/ibis/expr/types/arrays.py +++ b/third_party/bigframes_vendored/ibis/expr/types/arrays.py @@ -486,6 +486,24 @@ def map(self, func: Deferred | Callable[[ir.Value], ir.Value]) -> ir.ArrayValue: body = resolve(parameter.to_expr()) return ops.ArrayMap(self, param=parameter.param, body=body).to_expr() + def reduce(self, func: Deferred | Callable[[ir.Value], ir.Value]) -> ir.ArrayValue: + if isinstance(func, Deferred): + name = "_" + resolve = func.resolve + elif callable(func): + name = next(iter(inspect.signature(func).parameters.keys())) + resolve = func + else: + raise TypeError( + f"`func` must be a Deferred or Callable, got `{type(func).__name__}`" + ) + + parameter = ops.Argument( + name=name, shape=self.op().shape, dtype=self.type().value_type + ) + body = resolve(parameter.to_expr()) + return ops.ArrayReduce(self, param=parameter.param, body=body).to_expr() + def filter( self, predicate: Deferred | Callable[[ir.Value], bool | ir.BooleanValue] ) -> ir.ArrayValue: diff --git a/third_party/bigframes_vendored/ibis/expr/types/logical.py b/third_party/bigframes_vendored/ibis/expr/types/logical.py index 80a8527a04..cc86c747f6 100644 --- a/third_party/bigframes_vendored/ibis/expr/types/logical.py +++ b/third_party/bigframes_vendored/ibis/expr/types/logical.py @@ -353,6 +353,9 @@ def resolve_exists_subquery(outer): return Deferred(Call(resolve_exists_subquery, _)) elif len(parents) == 1: op = ops.Any(self, where=self._bind_to_parent_table(where)) + elif len(parents) == 0: + # array reduction case + op = ops.Any(self, where=self._bind_to_parent_table(where)) else: raise NotImplementedError( f'Cannot compute "any" for expression of type {type(self)} '