diff --git a/CHANGELOG.md b/CHANGELOG.md index 771f04776e..6e2ab05f47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ [1]: https://pypi.org/project/bigframes/#history +## [0.20.1](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.20.0...v0.20.1) (2024-02-06) + + +### Performance Improvements + +* Make repr cache the block where appropriate ([#350](https://github.com/googleapis/python-bigquery-dataframes/issues/350)) ([068879f](https://github.com/googleapis/python-bigquery-dataframes/commit/068879f97fb1626aca081106150803f832a0cf81)) + + +### Documentation + +* Add a sample to demonstrate the evaluation results ([#364](https://github.com/googleapis/python-bigquery-dataframes/issues/364)) ([cff0919](https://github.com/googleapis/python-bigquery-dataframes/commit/cff09194b2c3a96a1f50e86a38ee59783c2a343b)) +* Fix the `DataFrame.apply` code sample ([#366](https://github.com/googleapis/python-bigquery-dataframes/issues/366)) ([1866a26](https://github.com/googleapis/python-bigquery-dataframes/commit/1866a266f0fa40882b589579654c1ad428b036d8)) + ## [0.20.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.19.2...v0.20.0) (2024-01-30) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 8c399e34ab..ea169dbb74 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -118,7 +118,7 @@ def row_count(self) -> ArrayValue: # Operations def filter_by_id(self, predicate_id: str, keep_null: bool = False) -> ArrayValue: """Filter the table on a given expression, the predicate must be a boolean series aligned with the table expression.""" - predicate = ex.free_var(predicate_id) + predicate: ex.Expression = ex.free_var(predicate_id) if keep_null: predicate = ops.fillna_op.as_expr(predicate, ex.const(True)) return self.filter(predicate) @@ -241,7 +241,7 @@ def drop_columns(self, columns: Iterable[str]) -> ArrayValue: def aggregate( self, - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]], + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, ) -> ArrayValue: @@ -270,14 +270,23 @@ def corr_aggregate( Arguments: corr_aggregations: left_column_id, right_column_id, output_column_id tuples """ + aggregations = tuple( + ( + ex.BinaryAggregation( + agg_ops.CorrOp(), ex.free_var(agg[0]), ex.free_var(agg[1]) + ), + agg[2], + ) + for agg in corr_aggregations + ) return ArrayValue( - nodes.CorrNode(child=self.node, corr_aggregations=tuple(corr_aggregations)) + nodes.AggregateNode(child=self.node, aggregations=aggregations) ) def project_window_op( self, column_name: str, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window_spec: WindowSpec, output_name=None, *, diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 345adb6be3..9cc0a05680 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -38,8 +38,7 @@ def equals(block1: blocks.Block, block2: blocks.Block) -> bool: block1 = block1.reset_index(drop=False) block2 = block2.reset_index(drop=False) - joined, (lmap, rmap) = block1.index.join(block2.index, how="outer") - joined_block = joined._block + joined_block, (lmap, rmap) = block1.join(block2, how="outer") equality_ids = [] for lcol, rcol in zip(block1.value_columns, block2.value_columns): @@ -130,7 +129,7 @@ def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block: if len(index_columns) != 1: raise ValueError("only method 'linear' supports multi-index") xvalues = block.index_columns[0] - if block.index_dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: + if block.index.dtypes[0] not in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: raise ValueError("Can only interpolate on numeric index.") for column in original_columns: @@ -743,14 +742,14 @@ def align_rows( right_block: blocks.Block, join: str = "outer", ): - joined_index, (get_column_left, get_column_right) = left_block.index.join( - right_block.index, how=join + joined_block, (get_column_left, get_column_right) = left_block.join( + right_block, how=join ) left_columns = [get_column_left[col] for col in left_block.value_columns] right_columns = [get_column_right[col] for col in right_block.value_columns] - left_block = joined_index._block.select_columns(left_columns) - right_block = joined_index._block.select_columns(right_columns) + left_block = joined_block.select_columns(left_columns) + right_block = joined_block.select_columns(right_columns) return left_block, right_block diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9e17dc2752..e758e20335 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -26,7 +26,7 @@ import itertools import random import typing -from typing import Iterable, List, Optional, Sequence, Tuple +from typing import Iterable, List, Mapping, Optional, Sequence, Tuple import warnings import google.cloud.bigquery as bigquery @@ -37,7 +37,6 @@ import bigframes.core as core import bigframes.core.expression as ex import bigframes.core.guid as guid -import bigframes.core.indexes as indexes import bigframes.core.join_def as join_defs import bigframes.core.ordering as ordering import bigframes.core.utils @@ -140,10 +139,41 @@ def __init__( self._stats_cache[" ".join(self.index_columns)] = {} + @classmethod + def from_local(cls, data) -> Block: + pd_data = pd.DataFrame(data) + columns = pd_data.columns + + # Make a flattened version to treat as a table. + if len(pd_data.columns.names) > 1: + pd_data.columns = columns.to_flat_index() + + index_labels = list(pd_data.index.names) + # The ArrayValue layer doesn't know about indexes, so make sure indexes + # are real columns with unique IDs. + pd_data = pd_data.reset_index( + names=[f"level_{level}" for level in range(len(index_labels))] + ) + pd_data = pd_data.set_axis( + vendored_pandas_io_common.dedup_names( + list(pd_data.columns), is_potential_multiindex=False + ), + axis="columns", + ) + index_ids = pd_data.columns[: len(index_labels)] + + keys_expr = core.ArrayValue.from_pandas(pd_data) + return cls( + keys_expr, + column_labels=columns, + index_columns=index_ids, + index_labels=index_labels, + ) + @property - def index(self) -> indexes.IndexValue: + def index(self) -> BlockIndexProperties: """Row identities for values in the Block.""" - return indexes.IndexValue(self) + return BlockIndexProperties(self) @functools.cached_property def shape(self) -> typing.Tuple[int, int]: @@ -167,11 +197,6 @@ def index_columns(self) -> Sequence[str]: """Column(s) to use as row labels.""" return self._index_columns - @property - def index_labels(self) -> Sequence[Label]: - """Name of column(s) to use as row labels.""" - return self._index_labels - @property def value_columns(self) -> Sequence[str]: """All value columns, mutually exclusive with index columns.""" @@ -197,13 +222,6 @@ def dtypes( """Returns the dtypes of the value columns.""" return [self.expr.get_column_type(col) for col in self.value_columns] - @property - def index_dtypes( - self, - ) -> Sequence[bigframes.dtypes.Dtype]: - """Returns the dtypes of the index columns.""" - return [self.expr.get_column_type(col) for col in self.index_columns] - @property def session(self) -> core.Session: return self._expr.session @@ -398,7 +416,7 @@ def reorder_levels(self, ids: typing.Sequence[str]): def _to_dataframe(self, result) -> pd.DataFrame: """Convert BigQuery data to pandas DataFrame with specific dtypes.""" - dtypes = dict(zip(self.index_columns, self.index_dtypes)) + dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) return self.session._rows_to_dataframe(result, dtypes) @@ -444,7 +462,7 @@ def try_peek(self, n: int = 20) -> typing.Optional[pd.DataFrame]: def to_pandas_batches(self): """Download results one message at a time.""" - dtypes = dict(zip(self.index_columns, self.index_dtypes)) + dtypes = dict(zip(self.index_columns, self.index.dtypes)) dtypes.update(zip(self.value_columns, self.dtypes)) results_iterator, _ = self.session._execute(self.expr, sorted=True) for arrow_table in results_iterator.to_arrow_iterable( @@ -842,7 +860,7 @@ def filter(self, column_id: str, keep_null: bool = False): def aggregate_all_and_stack( self, - operation: agg_ops.AggregateOp, + operation: agg_ops.UnaryAggregateOp, *, axis: int | str = 0, value_col_id: str = "values", @@ -854,7 +872,8 @@ def aggregate_all_and_stack( axis_n = utils.get_axis_number(axis) if axis_n == 0: aggregations = [ - (col_id, operation, col_id) for col_id in self.value_columns + (ex.UnaryAggregation(operation, ex.free_var(col_id)), col_id) + for col_id in self.value_columns ] index_col_ids = [ guid.generate_guid() for i in range(self.column_labels.nlevels) @@ -884,10 +903,13 @@ def aggregate_all_and_stack( dtype=dtype, ) index_aggregations = [ - (col_id, agg_ops.AnyValueOp(), col_id) + (ex.UnaryAggregation(agg_ops.AnyValueOp(), ex.free_var(col_id)), col_id) for col_id in [*self.index_columns] ] - main_aggregation = (value_col_id, operation, value_col_id) + main_aggregation = ( + ex.UnaryAggregation(operation, ex.free_var(value_col_id)), + value_col_id, + ) result_expr = stacked_expr.aggregate( [*index_aggregations, main_aggregation], by_column_ids=[offset_col], @@ -897,7 +919,7 @@ def aggregate_all_and_stack( result_expr.drop_columns([offset_col]), self.index_columns, column_labels=[None], - index_labels=self.index_labels, + index_labels=self.index.names, ) def select_column(self, id: str) -> Block: @@ -948,7 +970,7 @@ def remap_f(x): def aggregate( self, by_column_ids: typing.Sequence[str] = (), - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp]] = (), + aggregations: typing.Sequence[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = (), *, dropna: bool = True, ) -> typing.Tuple[Block, typing.Sequence[str]]: @@ -961,10 +983,13 @@ def aggregate( dropna: whether null keys should be dropped """ agg_specs = [ - (input_id, operation, guid.generate_guid()) + ( + ex.UnaryAggregation(operation, ex.free_var(input_id)), + guid.generate_guid(), + ) for input_id, operation in aggregations ] - output_col_ids = [agg_spec[2] for agg_spec in agg_specs] + output_col_ids = [agg_spec[1] for agg_spec in agg_specs] result_expr = self.expr.aggregate(agg_specs, by_column_ids, dropna=dropna) aggregate_labels = self._get_labels_for_columns( @@ -986,7 +1011,7 @@ def aggregate( output_col_ids, ) - def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): + def get_stat(self, column_id: str, stat: agg_ops.UnaryAggregateOp): """Gets aggregates immediately, and caches it""" if stat.name in self._stats_cache[column_id]: return self._stats_cache[column_id][stat.name] @@ -996,7 +1021,10 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): standard_stats = self._standard_stats(column_id) stats_to_fetch = standard_stats if stat in standard_stats else [stat] - aggregations = [(column_id, stat, stat.name) for stat in stats_to_fetch] + aggregations = [ + (ex.UnaryAggregation(stat, ex.free_var(column_id)), stat.name) + for stat in stats_to_fetch + ] expr = self.expr.aggregate(aggregations) offset_index_id = guid.generate_guid() expr = expr.promote_offsets(offset_index_id) @@ -1036,13 +1064,13 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str): def summarize( self, column_ids: typing.Sequence[str], - stats: typing.Sequence[agg_ops.AggregateOp], + stats: typing.Sequence[agg_ops.UnaryAggregateOp], ): """Get a list of stats as a deferred block object.""" label_col_id = guid.generate_guid() labels = [stat.name for stat in stats] aggregations = [ - (col_id, stat, f"{col_id}-{stat.name}") + (ex.UnaryAggregation(stat, ex.free_var(col_id)), f"{col_id}-{stat.name}") for stat in stats for col_id in column_ids ] @@ -1058,7 +1086,7 @@ def summarize( labels = self._get_labels_for_columns(column_ids) return Block(expr, column_labels=labels, index_columns=[label_col_id]) - def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.AggregateOp]: + def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.UnaryAggregateOp]: """ Gets a standard set of stats to preemptively fetch for a column if any other stat is fetched. @@ -1069,7 +1097,7 @@ def _standard_stats(self, column_id) -> typing.Sequence[agg_ops.AggregateOp]: """ # TODO: annotate aggregations themself with this information dtype = self.expr.get_column_type(column_id) - stats: list[agg_ops.AggregateOp] = [agg_ops.count_op] + stats: list[agg_ops.UnaryAggregateOp] = [agg_ops.count_op] if dtype not in bigframes.dtypes.UNORDERED_DTYPES: stats += [agg_ops.min_op, agg_ops.max_op] if dtype in bigframes.dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE: @@ -1634,6 +1662,37 @@ def merge( expr = joined_expr.promote_offsets(offset_index_id) return Block(expr, index_columns=[offset_index_id], column_labels=labels) + def join( + self, + other: Block, + *, + how="left", + sort=False, + block_identity_join: bool = False, + ) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + if not isinstance(other, Block): + # TODO(swast): We need to improve this error message to be more + # actionable for the user. For example, it's possible they + # could call set_index and try again to resolve this error. + raise ValueError( + f"Tried to join with an unexpected type: {type(other)}. {constants.FEEDBACK_LINK}" + ) + + # TODO(swast): Support cross-joins (requires reindexing). + if how not in {"outer", "left", "right", "inner"}: + raise NotImplementedError( + f"Only how='outer','left','right','inner' currently supported. {constants.FEEDBACK_LINK}" + ) + if self.index.nlevels == other.index.nlevels == 1: + return join_mono_indexed( + self, other, how=how, sort=sort, block_identity_join=block_identity_join + ) + else: + # Always sort mult-index join + return join_multi_indexed( + self, other, how=how, sort=sort, block_identity_join=block_identity_join + ) + def _force_reproject(self) -> Block: """Forces a reprojection of the underlying tables expression. Used to force predicate/order application before subsequent operations.""" return Block( @@ -1670,7 +1729,7 @@ def to_sql_query( return empty lists. """ array_value = self._expr - col_labels, idx_labels = list(self.column_labels), list(self.index_labels) + col_labels, idx_labels = list(self.column_labels), list(self.index.names) old_col_ids, old_idx_ids = list(self.value_columns), list(self.index_columns) if not include_index: @@ -1695,33 +1754,24 @@ def to_sql_query( idx_labels, ) - def cached(self) -> Block: + def cached(self, *, optimize_offsets=False, force: bool = False) -> Block: """Write the block to a session table and create a new block object that references it.""" + # use a heuristic for whether something needs to be cached + if (not force) and self.session._is_trivially_executable(self.expr): + return self + if optimize_offsets: + expr = self.session._cache_with_offsets(self.expr) + else: + expr = self.session._cache_with_cluster_cols( + self.expr, cluster_cols=self.index_columns + ) return Block( - self.session._execute_and_cache(self.expr, cluster_cols=self.index_columns), + expr, index_columns=self.index_columns, column_labels=self.column_labels, - index_labels=self.index_labels, + index_labels=self.index.names, ) - def resolve_index_level(self, level: LevelsType) -> typing.Sequence[str]: - if utils.is_list_like(level): - levels = list(level) - else: - levels = [level] - resolved_level_ids = [] - for level_ref in levels: - if isinstance(level_ref, int): - resolved_level_ids.append(self.index_columns[level_ref]) - elif isinstance(level_ref, typing.Hashable): - matching_ids = self.index_name_to_col_id.get(level_ref, []) - if len(matching_ids) != 1: - raise ValueError("level name cannot be found or is ambiguous") - resolved_level_ids.append(matching_ids[0]) - else: - raise ValueError(f"Unexpected level: {level_ref}") - return resolved_level_ids - def _is_monotonic( self, column_ids: typing.Union[str, Sequence[str]], increasing: bool ) -> bool: @@ -1778,42 +1828,301 @@ def _is_monotonic( return result -def block_from_local(data) -> Block: - pd_data = pd.DataFrame(data) - columns = pd_data.columns +class BlockIndexProperties: + """Accessor for the index-related block properties.""" + + def __init__(self, block: Block): + self._block = block + + @property + def _expr(self) -> core.ArrayValue: + return self._block.expr + + @property + def name(self) -> Label: + return self._block._index_labels[0] + + @property + def names(self) -> typing.Sequence[Label]: + return self._block._index_labels - # Make a flattened version to treat as a table. - if len(pd_data.columns.names) > 1: - pd_data.columns = columns.to_flat_index() + @property + def nlevels(self) -> int: + return len(self._block._index_columns) - index_labels = list(pd_data.index.names) - # The ArrayValue layer doesn't know about indexes, so make sure indexes - # are real columns with unique IDs. - pd_data = pd_data.reset_index( - names=[f"level_{level}" for level in range(len(index_labels))] + @property + def dtypes( + self, + ) -> typing.Sequence[bigframes.dtypes.Dtype]: + return [ + self._block.expr.get_column_type(col) for col in self._block.index_columns + ] + + @property + def session(self) -> core.Session: + return self._expr.session + + @property + def column_ids(self) -> Sequence[str]: + """Column(s) to use as row labels.""" + return self._block._index_columns + + def __repr__(self) -> str: + """Converts an Index to a string.""" + # TODO(swast): Add a timeout here? If the query is taking a long time, + # maybe we just print the job metadata that we have so far? + # TODO(swast): Avoid downloading the whole index by using job + # metadata, like we do with DataFrame. + preview = self.to_pandas() + return repr(preview) + + def to_pandas(self) -> pd.Index: + """Executes deferred operations and downloads the results.""" + # Project down to only the index column. So the query can be cached to visualize other data. + index_columns = list(self._block.index_columns) + dtypes = dict(zip(index_columns, self.dtypes)) + expr = self._expr.select_columns(index_columns) + results, _ = self.session._execute(expr) + df = expr.session._rows_to_dataframe(results, dtypes) + df = df.set_index(index_columns) + index = df.index + index.names = list(self._block._index_labels) + return index + + def resolve_level(self, level: LevelsType) -> typing.Sequence[str]: + if utils.is_list_like(level): + levels = list(level) + else: + levels = [level] + resolved_level_ids = [] + for level_ref in levels: + if isinstance(level_ref, int): + resolved_level_ids.append(self._block.index_columns[level_ref]) + elif isinstance(level_ref, typing.Hashable): + matching_ids = self._block.index_name_to_col_id.get(level_ref, []) + if len(matching_ids) != 1: + raise ValueError("level name cannot be found or is ambiguous") + resolved_level_ids.append(matching_ids[0]) + else: + raise ValueError(f"Unexpected level: {level_ref}") + return resolved_level_ids + + def resolve_level_exact(self: BlockIndexProperties, label: Label) -> str: + matches = self._block.index_name_to_col_id.get(label, []) + if len(matches) > 1: + raise ValueError(f"Ambiguous index level name {label}") + if len(matches) == 0: + raise ValueError(f"Cannot resolve index level name {label}") + return matches[0] + + def is_uniquely_named(self: BlockIndexProperties): + return len(set(self.names)) == len(self.names) + + +def join_mono_indexed( + left: Block, + right: Block, + *, + how="left", + sort=False, + block_identity_join: bool = False, +) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + left_expr = left.expr + right_expr = right.expr + left_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in right_expr.column_ids + ] + + join_def = join_defs.JoinDefinition( + conditions=( + join_defs.JoinCondition(left.index_columns[0], right.index_columns[0]), + ), + mappings=(*left_mappings, *right_mappings), + type=how, + ) + combined_expr = left_expr.join( + right_expr, + join_def=join_def, + allow_row_identity_join=(not block_identity_join), ) - pd_data = pd_data.set_axis( - vendored_pandas_io_common.dedup_names( - list(pd_data.columns), is_potential_multiindex=False + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() + # Drop original indices from each side. and used the coalesced combination generated by the join. + left_index = get_column_left[left.index_columns[0]] + right_index = get_column_right[right.index_columns[0]] + # Drop original indices from each side. and used the coalesced combination generated by the join. + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, [left_index], [right_index], how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [ordering.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) + block = Block( + combined_expr, + index_columns=coalesced_join_cols, + column_labels=[*left.column_labels, *right.column_labels], + index_labels=[left.index.name] + if left.index.name == right.index.name + else [None], + ) + return ( + block, + (get_column_left, get_column_right), + ) + + +def join_multi_indexed( + left: Block, + right: Block, + *, + how="left", + sort=False, + block_identity_join: bool = False, +) -> Tuple[Block, Tuple[Mapping[str, str], Mapping[str, str]],]: + if not (left.index.is_uniquely_named() and right.index.is_uniquely_named()): + raise ValueError("Joins not supported on indices with non-unique level names") + + common_names = [name for name in left.index.names if name in right.index.names] + if len(common_names) == 0: + raise ValueError("Cannot join without a index level in common.") + + left_only_names = [ + name for name in left.index.names if name not in right.index.names + ] + right_only_names = [ + name for name in right.index.names if name not in left.index.names + ] + + left_join_ids = [left.index.resolve_level_exact(name) for name in common_names] + right_join_ids = [right.index.resolve_level_exact(name) for name in common_names] + + names_fully_match = len(left_only_names) == 0 and len(right_only_names) == 0 + + left_expr = left.expr + right_expr = right.expr + + left_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.LEFT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in left_expr.column_ids + ] + right_mappings = [ + join_defs.JoinColumnMapping( + source_table=join_defs.JoinSide.RIGHT, + source_id=id, + destination_id=guid.generate_guid(), + ) + for id in right_expr.column_ids + ] + + join_def = join_defs.JoinDefinition( + conditions=tuple( + join_defs.JoinCondition(left, right) + for left, right in zip(left_join_ids, right_join_ids) ), - axis="columns", + mappings=(*left_mappings, *right_mappings), + type=how, ) - index_ids = pd_data.columns[: len(index_labels)] - keys_expr = core.ArrayValue.from_pandas(pd_data) - return Block( - keys_expr, - column_labels=columns, - index_columns=index_ids, + combined_expr = left_expr.join( + right_expr, + join_def=join_def, + # If we're only joining on a subset of the index columns, we need to + # perform a true join. + allow_row_identity_join=(names_fully_match and not block_identity_join), + ) + get_column_left = join_def.get_left_mapping() + get_column_right = join_def.get_right_mapping() + left_ids_post_join = [get_column_left[id] for id in left_join_ids] + right_ids_post_join = [get_column_right[id] for id in right_join_ids] + # Drop original indices from each side. and used the coalesced combination generated by the join. + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, left_ids_post_join, right_ids_post_join, how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [ordering.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) + + if left.index.nlevels == 1: + index_labels = right.index.names + elif right.index.nlevels == 1: + index_labels = left.index.names + else: + index_labels = [*common_names, *left_only_names, *right_only_names] + + def resolve_label_id(label: Label) -> str: + # if name is shared between both blocks, coalesce the values + if label in common_names: + return coalesced_join_cols[common_names.index(label)] + if label in left_only_names: + return get_column_left[left.index.resolve_level_exact(label)] + if label in right_only_names: + return get_column_right[right.index.resolve_level_exact(label)] + raise ValueError(f"Unexpected label: {label}") + + index_columns = [resolve_label_id(label) for label in index_labels] + + block = Block( + combined_expr, + index_columns=index_columns, + column_labels=[*left.column_labels, *right.column_labels], index_labels=index_labels, ) + return ( + block, + (get_column_left, get_column_right), + ) + + +def coalesce_columns( + expr: core.ArrayValue, + left_ids: typing.Sequence[str], + right_ids: typing.Sequence[str], + how: str, +) -> Tuple[core.ArrayValue, Sequence[str]]: + result_ids = [] + for left_id, right_id in zip(left_ids, right_ids): + if how == "left" or how == "inner": + result_ids.append(left_id) + expr = expr.drop_columns([right_id]) + elif how == "right": + result_ids.append(right_id) + expr = expr.drop_columns([left_id]) + elif how == "outer": + coalesced_id = guid.generate_guid() + expr = expr.project_to_id( + ops.coalesce_op.as_expr(left_id, right_id), coalesced_id + ) + expr = expr.drop_columns([left_id, right_id]) + result_ids.append(coalesced_id) + else: + raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") + return expr, result_ids def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]): original_block = block result_ids = [] for idx_id, idx_dtype, target_dtype in zip( - block.index_columns, block.index_dtypes, dtypes + block.index_columns, block.index.dtypes, dtypes ): if idx_dtype != target_dtype: block, result_id = block.apply_unary_op(idx_id, ops.AsTypeOp(target_dtype)) @@ -1826,10 +2135,12 @@ def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]): expr, index_columns=result_ids, column_labels=original_block.column_labels, - index_labels=original_block.index_labels, + index_labels=original_block.index.names, ) +### Schema alignment Utils +### TODO: Pull out to separate module? def _align_block_to_schema( block: Block, schema: dict[Label, bigframes.dtypes.Dtype] ) -> Block: diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 044c33799e..0dbc0e7310 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -21,31 +21,63 @@ import pandas as pd import bigframes.constants as constants +import bigframes.core.compile.scalar_op_compiler as scalar_compilers +import bigframes.core.expression as ex import bigframes.core.window_spec as window_spec import bigframes.dtypes as dtypes import bigframes.operations.aggregations as agg_ops import third_party.bigframes_vendored.ibis.expr.operations as vendored_ibis_ops +scalar_compiler = scalar_compilers.scalar_op_compiler -def compile_unary_aggregate( - op: agg_ops.AggregateOp, input: ibis_types.Column + +def compile_aggregate( + aggregate: ex.Aggregation, + bindings: typing.Dict[str, ibis_types.Value], ) -> ibis_types.Value: - return compile_agg(op, input) + if isinstance(aggregate, ex.UnaryAggregation): + input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) + return compile_unary_agg( + aggregate.op, + input, + ) + elif isinstance(aggregate, ex.BinaryAggregation): + left = scalar_compiler.compile_expression(aggregate.left, bindings=bindings) + right = scalar_compiler.compile_expression(aggregate.right, bindings=bindings) + return compile_binary_agg(aggregate.op, left, right) + else: + raise ValueError(f"Unexpected aggregation: {aggregate}") -def compile_unary_analytic( - op: agg_ops.WindowOp, input: ibis_types.Column, window: window_spec.WindowSpec +def compile_analytic( + aggregate: ex.Aggregation, + window: window_spec.WindowSpec, + bindings: typing.Dict[str, ibis_types.Value], +) -> ibis_types.Value: + if isinstance(aggregate, ex.UnaryAggregation): + input = scalar_compiler.compile_expression(aggregate.arg, bindings=bindings) + return compile_unary_agg(aggregate.op, input, window) + elif isinstance(aggregate, ex.BinaryAggregation): + raise NotImplementedError("binary analytic operations not yet supported") + else: + raise ValueError(f"Unexpected analytic operation: {aggregate}") + + +@functools.singledispatch +def compile_binary_agg( + op: agg_ops.WindowOp, + input: ibis_types.Column, + window: Optional[window_spec.WindowSpec] = None, ) -> ibis_types.Value: - return compile_agg(op, input, window) + raise ValueError(f"Can't compile unrecognized operation: {op}") @functools.singledispatch -def compile_agg( +def compile_unary_agg( op: agg_ops.WindowOp, input: ibis_types.Column, window: Optional[window_spec.WindowSpec] = None, ) -> ibis_types.Value: - """Defines transformation but isn't cached, always use compile_node instead""" raise ValueError(f"Can't compile unrecognized operation: {op}") @@ -66,7 +98,10 @@ def constrained_op(op, column: ibis_types.Column, window=None): return constrained_op -@compile_agg.register +### Specific Op implementations Below + + +@compile_unary_agg.register @numeric_op def _( op: agg_ops.SumOp, column: ibis_types.NumericColumn, window=None @@ -78,7 +113,7 @@ def _( ) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _( op: agg_ops.MedianOp, column: ibis_types.NumericColumn, window=None @@ -96,7 +131,7 @@ def _( return cast(ibis_types.NumericValue, column.approx_median()) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _( op: agg_ops.ApproxQuartilesOp, column: ibis_types.NumericColumn, window=None @@ -109,11 +144,11 @@ def _( ) value = vendored_ibis_ops.ApproximateMultiQuantile( column, num_bins=4 # type: ignore - ).to_expr()[op._quartile] + ).to_expr()[op.quartile] return cast(ibis_types.NumericValue, value) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _( op: agg_ops.MeanOp, column: ibis_types.NumericColumn, window=None @@ -121,7 +156,7 @@ def _( return _apply_window_if_present(column.mean(), window) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _( op: agg_ops.ProductOp, column: ibis_types.NumericColumn, window=None @@ -158,29 +193,29 @@ def _( return float_result.cast(column.type()) # type: ignore -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.MaxOp, column: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(column.max(), window) -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.MinOp, column: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(column.min(), window) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _(op: agg_ops.StdOp, x: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).std(), window) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _(op: agg_ops.VarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(cast(ibis_types.NumericColumn, x).var(), window) -@compile_agg.register +@compile_unary_agg.register @numeric_op def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present( @@ -188,35 +223,34 @@ def _(op: agg_ops.PopVarOp, x: ibis_types.Column, window=None) -> ibis_types.Val ) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.CountOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.count(), window) -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): out = ibis.case() - - if op._bins_int > 0: + if isinstance(op.bins, int): col_min = _apply_window_if_present(x.min(), window) col_max = _apply_window_if_present(x.max(), window) - bin_width = (col_max - col_min) / op._bins + bin_width = (col_max - col_min) / op.bins - if op._labels is False: - for this_bin in range(op._bins_int - 1): + if op.labels is False: + for this_bin in range(op.bins - 1): out = out.when( x <= (col_min + (this_bin + 1) * bin_width), dtypes.literal_to_ibis_scalar( this_bin, force_dtype=pd.Int64Dtype() ), ) - out = out.when(x.notnull(), op._bins - 1) + out = out.when(x.notnull(), op.bins - 1) else: interval_struct = None adj = (col_max - col_min) * 0.001 - for this_bin in range(op._bins_int): + for this_bin in range(op.bins): left_edge = ( col_min + this_bin * bin_width - (0 if this_bin > 0 else adj) ) @@ -228,30 +262,32 @@ def _(op: agg_ops.CutOp, x: ibis_types.Column, window=None): } ) - if this_bin < op._bins_int - 1: + if this_bin < op.bins - 1: out = out.when( x <= (col_min + (this_bin + 1) * bin_width), interval_struct, ) else: out = out.when(x.notnull(), interval_struct) - else: - for interval in op._bins: - condition = (x > interval.left) & (x <= interval.right) + else: # Interpret as intervals + for interval in op.bins: + left = dtypes.literal_to_ibis_scalar(interval[0]) + right = dtypes.literal_to_ibis_scalar(interval[1]) + condition = (x > left) & (x <= right) interval_struct = ibis.struct( - {"left_exclusive": interval.left, "right_inclusive": interval.right} + {"left_exclusive": left, "right_inclusive": right} ) out = out.when(condition, interval_struct) return out.end() -@compile_agg.register +@compile_unary_agg.register @numeric_op def _( self: agg_ops.QcutOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: - if isinstance(self._quantiles, int): - quantiles_ibis = dtypes.literal_to_ibis_scalar(self._quantiles) + if isinstance(self.quantiles, int): + quantiles_ibis = dtypes.literal_to_ibis_scalar(self.quantiles) percent_ranks = cast( ibis_types.FloatingColumn, _apply_window_if_present(column.percent_rank(), window), @@ -264,10 +300,10 @@ def _( _apply_window_if_present(column.percent_rank(), window), ) out = ibis.case() - first_ibis_quantile = dtypes.literal_to_ibis_scalar(self._quantiles[0]) + first_ibis_quantile = dtypes.literal_to_ibis_scalar(self.quantiles[0]) out = out.when(percent_ranks < first_ibis_quantile, None) - for bucket_n in range(len(self._quantiles) - 1): - ibis_quantile = dtypes.literal_to_ibis_scalar(self._quantiles[bucket_n + 1]) + for bucket_n in range(len(self.quantiles) - 1): + ibis_quantile = dtypes.literal_to_ibis_scalar(self.quantiles[bucket_n + 1]) out = out.when( percent_ranks <= ibis_quantile, dtypes.literal_to_ibis_scalar(bucket_n, force_dtype=pd.Int64Dtype()), @@ -276,21 +312,21 @@ def _( return out.end() # type: ignore -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.NuniqueOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.nunique(), window) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.AnyValueOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: return _apply_window_if_present(column.arbitrary(), window) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.RankOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: @@ -298,7 +334,7 @@ def _( return _apply_window_if_present(column.rank(), window) + 1 -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.DenseRankOp, column: ibis_types.Column, window=None ) -> ibis_types.IntegerValue: @@ -306,12 +342,12 @@ def _( return _apply_window_if_present(column.dense_rank(), window) + 1 -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(column.first(), window) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.FirstNonNullOp, column: ibis_types.Column, window=None ) -> ibis_types.Value: @@ -320,12 +356,12 @@ def _( ) -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.LastOp, column: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(column.last(), window) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.LastNonNullOp, column: ibis_types.Column, window=None ) -> ibis_types.Value: @@ -334,18 +370,18 @@ def _( ) -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.ShiftOp, column: ibis_types.Column, window=None) -> ibis_types.Value: - if op._periods == 0: # No-op + if op.periods == 0: # No-op return column - if op._periods > 0: - return _apply_window_if_present(column.lag(op._periods), window) - return _apply_window_if_present(column.lead(-op._periods), window) + if op.periods > 0: + return _apply_window_if_present(column.lag(op.periods), window) + return _apply_window_if_present(column.lead(-op.periods), window) -@compile_agg.register +@compile_unary_agg.register def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types.Value: - shifted = compile_agg(agg_ops.ShiftOp(op._periods), column, window) + shifted = compile_unary_agg(agg_ops.ShiftOp(op.periods), column, window) if column.type().is_boolean(): return cast(ibis_types.BooleanColumn, column) != cast( ibis_types.BooleanColumn, shifted @@ -358,7 +394,7 @@ def _(op: agg_ops.DiffOp, column: ibis_types.Column, window=None) -> ibis_types. raise TypeError(f"Cannot perform diff on type{column.type()}") -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.AllOp, column: ibis_types.Column, window=None ) -> ibis_types.BooleanValue: @@ -370,7 +406,7 @@ def _( ) -@compile_agg.register +@compile_unary_agg.register def _( op: agg_ops.AnyOp, column: ibis_types.Column, window=None ) -> ibis_types.BooleanValue: @@ -382,6 +418,19 @@ def _( ) +@compile_binary_agg.register +def _( + op: agg_ops.CorrOp, left: ibis_types.Column, right: ibis_types.Column, window=None +) -> ibis_types.NumericValue: + # Will be null if all inputs are null. Pandas defaults to zero sum though. + left_numeric = cast(ibis_types.NumericColumn, left) + right_numeric = cast(ibis_types.NumericColumn, right) + bq_corr = _apply_window_if_present( + left_numeric.corr(right_numeric, how="pop"), window + ) + return cast(ibis_types.NumericColumn, bq_corr) + + def _apply_window_if_present(value: ibis_types.Value, window): return value.over(window) if (window is not None) else value diff --git a/bigframes/core/compile/analytic_compiler.py b/bigframes/core/compile/analytic_compiler.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index c867eaf680..969437939f 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -435,7 +435,7 @@ def unpivot( def aggregate( self, - aggregations: typing.Sequence[typing.Tuple[str, agg_ops.AggregateOp, str]], + aggregations: typing.Sequence[typing.Tuple[ex.Aggregation, str]], by_column_ids: typing.Sequence[str] = (), dropna: bool = True, ) -> OrderedIR: @@ -447,9 +447,10 @@ def aggregate( dropna: whether null keys should be dropped """ table = self._to_ibis_expr() + bindings = {col: table[col] for col in self.column_ids} stats = { - col_out: agg_compiler.compile_agg(agg_op, table[col_in]) - for col_in, agg_op, col_out in aggregations + col_out: agg_compiler.compile_aggregate(aggregate, bindings) + for aggregate, col_out in aggregations } if by_column_ids: result = table.group_by(by_column_ids).aggregate(**stats) @@ -488,35 +489,6 @@ def aggregate( ordering=ordering, ) - def corr_aggregate( - self, corr_aggregations: typing.Sequence[typing.Tuple[str, str, str]] - ) -> OrderedIR: - """ - Get correlations between each lef_column_id and right_column_id, stored in the respective output_column_id. - This uses BigQuery's CORR under the hood, and thus only Pearson's method is used. - Arguments: - corr_aggregations: left_column_id, right_column_id, output_column_id tuples - """ - table = self._to_ibis_expr() - stats = { - col_out: table[col_left].corr(table[col_right], how="pop") - for col_left, col_right, col_out in corr_aggregations - } - aggregates = {**stats, ORDER_ID_COLUMN: ibis_types.literal(0)} - result = table.aggregate(**aggregates) - # Ordering is irrelevant for single-row output, but set ordering id regardless as other ops(join etc.) expect it. - ordering = ExpressionOrdering( - ordering_value_columns=tuple([OrderingColumnReference(ORDER_ID_COLUMN)]), - total_ordering_columns=frozenset([ORDER_ID_COLUMN]), - integer_encoding=IntegerEncoding(is_encoded=True, is_sequential=True), - ) - return OrderedIR( - result, - columns=[result[col_id] for col_id in [*stats.keys()]], - hidden_ordering_columns=[result[ORDER_ID_COLUMN]], - ordering=ordering, - ) - def _uniform_sampling(self, fraction: float) -> UnorderedIR: """Sampling the table on given fraction. @@ -792,7 +764,7 @@ def promote_offsets(self, col_id: str) -> OrderedIR: def project_window_op( self, column_name: str, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window_spec: WindowSpec, output_name=None, *, @@ -810,8 +782,11 @@ def project_window_op( """ column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name)) window = self._ibis_window_from_spec(window_spec, allow_ties=op.handles_ties) + bindings = {col: self._get_ibis_column(col) for col in self.column_ids} - window_op = agg_compiler.compile_unary_analytic(op, column, window) + window_op = agg_compiler.compile_analytic( + ex.UnaryAggregation(op, ex.free_var(column_name)), window, bindings=bindings + ) clauses = [] if op.skips_nulls and not never_skip_nulls: @@ -819,15 +794,19 @@ def project_window_op( if window_spec.min_periods: if op.skips_nulls: # Most operations do not count NULL values towards min_periods - observation_count = agg_compiler.compile_unary_analytic( - agg_ops.count_op, column, window + observation_count = agg_compiler.compile_analytic( + ex.UnaryAggregation(agg_ops.count_op, ex.free_var(column_name)), + window, + bindings=bindings, ) else: # Operations like count treat even NULLs as valid observations for the sake of min_periods # notnull is just used to convert null values to non-null (FALSE) values to be counted denulled_value = typing.cast(ibis_types.BooleanColumn, column.notnull()) - observation_count = agg_compiler.compile_unary_analytic( - agg_ops.count_op, denulled_value, window + observation_count = agg_compiler.compile_analytic( + ex.UnaryAggregation(agg_ops.count_op, ex.free_var("_denulled")), + window, + bindings={**bindings, "_denulled": denulled_value}, ) clauses.append( ( diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 2ec00f7073..4ced85352c 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -162,12 +162,6 @@ def compile_aggregate(node: nodes.AggregateNode, ordered: bool = True): return result if ordered else result.to_unordered() -@_compile_node.register -def compile_corr(node: nodes.CorrNode, ordered: bool = True): - result = compile_unordered_ir(node.child).corr_aggregate(node.corr_aggregations) - return result if ordered else result.to_unordered() - - @_compile_node.register def compile_window(node: nodes.WindowOpNode, ordered: bool = True): result = compile_ordered_ir(node.child).project_window_op( diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index d1be644439..ec9e698412 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -18,19 +18,47 @@ import dataclasses import itertools import typing +from typing import Union import bigframes.dtypes as dtypes import bigframes.operations +import bigframes.operations.aggregations as agg_ops def const(value: typing.Hashable, dtype: dtypes.ExpressionType = None) -> Expression: return ScalarConstantExpression(value, dtype or dtypes.infer_literal_type(value)) -def free_var(id: str) -> Expression: +def free_var(id: str) -> UnboundVariableExpression: return UnboundVariableExpression(id) +@dataclasses.dataclass(frozen=True) +class Aggregation(abc.ABC): + """Represents windowing or aggregation over a column.""" + + op: agg_ops.WindowOp = dataclasses.field() + + +@dataclasses.dataclass(frozen=True) +class UnaryAggregation(Aggregation): + op: agg_ops.UnaryWindowOp = dataclasses.field() + arg: Union[ + UnboundVariableExpression, ScalarConstantExpression + ] = dataclasses.field() + + +@dataclasses.dataclass(frozen=True) +class BinaryAggregation(Aggregation): + op: agg_ops.BinaryAggregateOp = dataclasses.field() + left: Union[ + UnboundVariableExpression, ScalarConstantExpression + ] = dataclasses.field() + right: Union[ + UnboundVariableExpression, ScalarConstantExpression + ] = dataclasses.field() + + @dataclasses.dataclass(frozen=True) class Expression(abc.ABC): """An expression represents a computation taking N scalar inputs and producing a single output scalar.""" diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index ab6b15e7b9..9a0889b041 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -268,7 +268,7 @@ def _agg_string(self, func: str) -> df.DataFrame: return dataframe if self._as_index else self._convert_index(dataframe) def _agg_dict(self, func: typing.Mapping) -> df.DataFrame: - aggregations: typing.List[typing.Tuple[str, agg_ops.AggregateOp]] = [] + aggregations: typing.List[typing.Tuple[str, agg_ops.UnaryAggregateOp]] = [] column_labels = [] want_aggfunc_level = any(utils.is_list_like(aggs) for aggs in func.values()) @@ -384,7 +384,7 @@ def _column_type(self, col_id: str) -> dtypes.Dtype: return dtype def _aggregate_all( - self, aggregate_op: agg_ops.AggregateOp, numeric_only: bool = False + self, aggregate_op: agg_ops.UnaryAggregateOp, numeric_only: bool = False ) -> df.DataFrame: aggregated_col_ids = self._aggregated_columns(numeric_only=numeric_only) aggregations = [(col_id, aggregate_op) for col_id in aggregated_col_ids] @@ -600,7 +600,7 @@ def expanding(self, min_periods: int = 1) -> windows.Window: is_series=True, ) - def _aggregate(self, aggregate_op: agg_ops.AggregateOp) -> series.Series: + def _aggregate(self, aggregate_op: agg_ops.UnaryAggregateOp) -> series.Series: result_block, _ = self._block.aggregate( self._by_col_ids, ((self._value_column, aggregate_op),), diff --git a/bigframes/core/indexes/__init__.py b/bigframes/core/indexes/__init__.py index 184a9ce262..6419d0985a 100644 --- a/bigframes/core/indexes/__init__.py +++ b/bigframes/core/indexes/__init__.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from bigframes.core.indexes.index import Index, IndexValue +from bigframes.core.indexes.index import Index __all__ = [ "Index", - "IndexValue", ] diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 78a4fc6f0b..3ae4fbe24a 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -17,23 +17,19 @@ from __future__ import annotations import typing -from typing import Hashable, Mapping, Optional, Sequence, Tuple, Union +from typing import Hashable, Optional, Sequence, Union import google.cloud.bigquery as bigquery import numpy as np import pandas import bigframes.constants as constants -import bigframes.core as core import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.expression as ex -import bigframes.core.guid -import bigframes.core.join_def as join_defs import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dtypes -import bigframes.dtypes as bf_dtypes import bigframes.formatting_helpers as formatter import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -125,12 +121,12 @@ def shape(self) -> typing.Tuple[int]: @property def dtype(self): - return self._block.index_dtypes[0] if self.nlevels == 1 else np.dtype("O") + return self._block.index.dtypes[0] if self.nlevels == 1 else np.dtype("O") @property def dtypes(self) -> pandas.Series: return pandas.Series( - data=self._block.index_dtypes, index=self._block.index_labels # type:ignore + data=self._block.index.dtypes, index=self._block.index.names # type:ignore ) @property @@ -422,10 +418,10 @@ def _apply_unary_expr( block, result_id = block.project_expr(op.rename({unbound_variable: col})) result_ids.append(result_id) - block = block.set_index(result_ids, index_labels=self._block.index_labels) + block = block.set_index(result_ids, index_labels=self._block.index.names) return Index(block) - def _apply_aggregation(self, op: agg_ops.AggregateOp) -> typing.Any: + def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> typing.Any: if self.nlevels > 1: raise NotImplementedError(f"Multiindex does not yet support {op.name}") column_id = self._block.index_columns[0] @@ -450,7 +446,7 @@ def to_pandas(self) -> pandas.Index: pandas.Index: A pandas Index with all of the labels from this Index. """ - return IndexValue(self._block).to_pandas() + return self._block.index.to_pandas() def to_numpy(self, dtype=None, **kwargs) -> np.ndarray: return self.to_pandas().to_numpy(dtype, **kwargs) @@ -490,286 +486,3 @@ def names(self, values: typing.Sequence[blocks.Label]): new_block = self._whole_frame._get_block().with_index_labels(values) self._whole_frame._set_block(new_block) self._block = new_block - - -class IndexValue: - """An immutable index.""" - - def __init__(self, block: blocks.Block): - self._block = block - - @property - def _expr(self) -> core.ArrayValue: - return self._block.expr - - @property - def name(self) -> blocks.Label: - return self._block._index_labels[0] - - @property - def names(self) -> typing.Sequence[blocks.Label]: - return self._block._index_labels - - @property - def nlevels(self) -> int: - return len(self._block._index_columns) - - @property - def dtypes( - self, - ) -> typing.Sequence[typing.Union[bf_dtypes.Dtype, np.dtype[typing.Any]]]: - return self._block.index_dtypes - - @property - def session(self) -> core.Session: - return self._expr.session - - def to_pandas(self) -> pandas.Index: - """Executes deferred operations and downloads the results.""" - # Project down to only the index column. So the query can be cached to visualize other data. - index_columns = list(self._block.index_columns) - dtypes = dict(zip(index_columns, self.dtypes)) - expr = self._expr.select_columns(index_columns) - results, _ = self.session._execute(expr) - df = expr.session._rows_to_dataframe(results, dtypes) - df = df.set_index(index_columns) - index = df.index - index.names = list(self._block._index_labels) - return index - - def join( - self, - other: IndexValue, - *, - how="left", - sort=False, - block_identity_join: bool = False, - ) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: - if not isinstance(other, IndexValue): - # TODO(swast): We need to improve this error message to be more - # actionable for the user. For example, it's possible they - # could call set_index and try again to resolve this error. - raise ValueError( - f"Tried to join with an unexpected type: {type(other)}. {constants.FEEDBACK_LINK}" - ) - - # TODO(swast): Support cross-joins (requires reindexing). - if how not in {"outer", "left", "right", "inner"}: - raise NotImplementedError( - f"Only how='outer','left','right','inner' currently supported. {constants.FEEDBACK_LINK}" - ) - if self.nlevels == other.nlevels == 1: - return join_mono_indexed( - self, other, how=how, sort=sort, block_identity_join=block_identity_join - ) - else: - # Always sort mult-index join - return join_multi_indexed( - self, other, how=how, sort=sort, block_identity_join=block_identity_join - ) - - def resolve_level_name(self: IndexValue, label: blocks.Label) -> str: - matches = self._block.index_name_to_col_id.get(label, []) - if len(matches) > 1: - raise ValueError(f"Ambiguous index level name {label}") - if len(matches) == 0: - raise ValueError(f"Cannot resolve index level name {label}") - return matches[0] - - def is_uniquely_named(self: IndexValue): - return len(set(self.names)) == len(self.names) - - -def join_mono_indexed( - left: IndexValue, - right: IndexValue, - *, - how="left", - sort=False, - block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: - left_expr = left._block.expr - right_expr = right._block.expr - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=bigframes.core.guid.generate_guid(), - ) - for id in left_expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=bigframes.core.guid.generate_guid(), - ) - for id in right_expr.column_ids - ] - - join_def = join_defs.JoinDefinition( - conditions=( - join_defs.JoinCondition( - left._block.index_columns[0], right._block.index_columns[0] - ), - ), - mappings=(*left_mappings, *right_mappings), - type=how, - ) - combined_expr = left_expr.join( - right_expr, - join_def=join_def, - allow_row_identity_join=(not block_identity_join), - ) - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() - # Drop original indices from each side. and used the coalesced combination generated by the join. - left_index = get_column_left[left._block.index_columns[0]] - right_index = get_column_right[right._block.index_columns[0]] - # Drop original indices from each side. and used the coalesced combination generated by the join. - combined_expr, coalesced_join_cols = coalesce_columns( - combined_expr, [left_index], [right_index], how=how - ) - if sort: - combined_expr = combined_expr.order_by( - [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] - ) - block = blocks.Block( - combined_expr, - index_columns=coalesced_join_cols, - column_labels=[*left._block.column_labels, *right._block.column_labels], - index_labels=[left.name] if left.name == right.name else [None], - ) - return ( - typing.cast(IndexValue, block.index), - (get_column_left, get_column_right), - ) - - -def join_multi_indexed( - left: IndexValue, - right: IndexValue, - *, - how="left", - sort=False, - block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: - if not (left.is_uniquely_named() and right.is_uniquely_named()): - raise ValueError("Joins not supported on indices with non-unique level names") - - common_names = [name for name in left.names if name in right.names] - if len(common_names) == 0: - raise ValueError("Cannot join without a index level in common.") - - left_only_names = [name for name in left.names if name not in right.names] - right_only_names = [name for name in right.names if name not in left.names] - - left_join_ids = [left.resolve_level_name(name) for name in common_names] - right_join_ids = [right.resolve_level_name(name) for name in common_names] - - names_fully_match = len(left_only_names) == 0 and len(right_only_names) == 0 - - left_expr = left._block.expr - right_expr = right._block.expr - - left_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.LEFT, - source_id=id, - destination_id=bigframes.core.guid.generate_guid(), - ) - for id in left_expr.column_ids - ] - right_mappings = [ - join_defs.JoinColumnMapping( - source_table=join_defs.JoinSide.RIGHT, - source_id=id, - destination_id=bigframes.core.guid.generate_guid(), - ) - for id in right_expr.column_ids - ] - - join_def = join_defs.JoinDefinition( - conditions=tuple( - join_defs.JoinCondition(left, right) - for left, right in zip(left_join_ids, right_join_ids) - ), - mappings=(*left_mappings, *right_mappings), - type=how, - ) - - combined_expr = left_expr.join( - right_expr, - join_def=join_def, - # If we're only joining on a subset of the index columns, we need to - # perform a true join. - allow_row_identity_join=(names_fully_match and not block_identity_join), - ) - get_column_left = join_def.get_left_mapping() - get_column_right = join_def.get_right_mapping() - left_ids_post_join = [get_column_left[id] for id in left_join_ids] - right_ids_post_join = [get_column_right[id] for id in right_join_ids] - # Drop original indices from each side. and used the coalesced combination generated by the join. - combined_expr, coalesced_join_cols = coalesce_columns( - combined_expr, left_ids_post_join, right_ids_post_join, how=how - ) - if sort: - combined_expr = combined_expr.order_by( - [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] - ) - - if left.nlevels == 1: - index_labels = right.names - elif right.nlevels == 1: - index_labels = left.names - else: - index_labels = [*common_names, *left_only_names, *right_only_names] - - def resolve_label_id(label: blocks.Label) -> str: - # if name is shared between both blocks, coalesce the values - if label in common_names: - return coalesced_join_cols[common_names.index(label)] - if label in left_only_names: - return get_column_left[left.resolve_level_name(label)] - if label in right_only_names: - return get_column_right[right.resolve_level_name(label)] - raise ValueError(f"Unexpected label: {label}") - - index_columns = [resolve_label_id(label) for label in index_labels] - - block = blocks.Block( - combined_expr, - index_columns=index_columns, - column_labels=[*left._block.column_labels, *right._block.column_labels], - index_labels=index_labels, - ) - return ( - typing.cast(IndexValue, block.index), - (get_column_left, get_column_right), - ) - - -def coalesce_columns( - expr: core.ArrayValue, - left_ids: typing.Sequence[str], - right_ids: typing.Sequence[str], - how: str, -) -> Tuple[core.ArrayValue, Sequence[str]]: - result_ids = [] - for left_id, right_id in zip(left_ids, right_ids): - if how == "left" or how == "inner": - result_ids.append(left_id) - expr = expr.drop_columns([right_id]) - elif how == "right": - result_ids.append(right_id) - expr = expr.drop_columns([left_id]) - elif how == "outer": - coalesced_id = bigframes.core.guid.generate_guid() - expr = expr.project_to_id( - ops.coalesce_op.as_expr(left_id, right_id), coalesced_id - ) - expr = expr.drop_columns([left_id, right_id]) - result_ids.append(coalesced_id) - else: - raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") - return expr, result_ids diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index e1882c3684..f637177a94 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -50,6 +50,19 @@ def deterministic(self) -> bool: """Whether this node will evaluates deterministically.""" return True + @property + def row_preserving(self) -> bool: + """Whether this node preserves input rows.""" + return True + + @property + def non_local(self) -> bool: + """ + Whether this node combines information across multiple rows instead of processing rows independently. + Used as an approximation for whether the expression may require shuffling to execute (and therefore be expensive). + """ + return False + @property def child_nodes(self) -> typing.Sequence[BigFrameNode]: """Direct children of this node""" @@ -104,6 +117,14 @@ class JoinNode(BigFrameNode): join: JoinDefinition allow_row_identity_join: bool = True + @property + def row_preserving(self) -> bool: + return False + + @property + def non_local(self) -> bool: + return True + @property def child_nodes(self) -> typing.Sequence[BigFrameNode]: return (self.left_child, self.right_child) @@ -184,11 +205,19 @@ def __hash__(self): def peekable(self) -> bool: return False + @property + def non_local(self) -> bool: + return False + @dataclass(frozen=True) class FilterNode(UnaryNode): predicate: ex.Expression + @property + def row_preserving(self) -> bool: + return False + def __hash__(self): return self._node_hash @@ -218,31 +247,29 @@ def __hash__(self): return self._node_hash -# TODO: Merge RowCount and Corr into Aggregate Node +# TODO: Merge RowCount into Aggregate Node? +# Row count can be compute from table metadata sometimes, so it is a bit special. @dataclass(frozen=True) class RowCountNode(UnaryNode): - pass + @property + def row_preserving(self) -> bool: + return False + + @property + def non_local(self) -> bool: + return True @dataclass(frozen=True) class AggregateNode(UnaryNode): - aggregations: typing.Tuple[typing.Tuple[str, agg_ops.AggregateOp, str], ...] + aggregations: typing.Tuple[typing.Tuple[ex.Aggregation, str], ...] by_column_ids: typing.Tuple[str, ...] = tuple([]) dropna: bool = True - def __hash__(self): - return self._node_hash - @property - def peekable(self) -> bool: + def row_preserving(self) -> bool: return False - -# TODO: Unify into aggregate -@dataclass(frozen=True) -class CorrNode(UnaryNode): - corr_aggregations: typing.Tuple[typing.Tuple[str, str, str], ...] - def __hash__(self): return self._node_hash @@ -250,11 +277,15 @@ def __hash__(self): def peekable(self) -> bool: return False + @property + def non_local(self) -> bool: + return True + @dataclass(frozen=True) class WindowOpNode(UnaryNode): column_name: str - op: agg_ops.WindowOp + op: agg_ops.UnaryWindowOp window_spec: window.WindowSpec output_name: typing.Optional[str] = None never_skip_nulls: bool = False @@ -267,6 +298,10 @@ def __hash__(self): def peekable(self) -> bool: return False + @property + def non_local(self) -> bool: + return True + @dataclass(frozen=True) class ReprojectOpNode(UnaryNode): @@ -290,6 +325,14 @@ class UnpivotNode(UnaryNode): def __hash__(self): return self._node_hash + @property + def row_preserving(self) -> bool: + return False + + @property + def non_local(self) -> bool: + return True + @property def peekable(self) -> bool: return False @@ -303,5 +346,9 @@ class RandomSampleNode(UnaryNode): def deterministic(self) -> bool: return False + @property + def row_preserving(self) -> bool: + return False + def __hash__(self): return self._node_hash diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index 3ab89e0213..1fd5ab4e37 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -92,6 +92,14 @@ class ExpressionOrdering: # Therefore, any modifications(or drops) done to these columns must result in hidden copies being made. total_ordering_columns: frozenset[str] = field(default_factory=frozenset) + @classmethod + def from_offset_col(cls, col: str) -> ExpressionOrdering: + return ExpressionOrdering( + (OrderingColumnReference(col),), + integer_encoding=IntegerEncoding(True, is_sequential=True), + total_ordering_columns=frozenset({col}), + ) + def with_non_sequential(self): """Create a copy that is marked as non-sequential. diff --git a/bigframes/core/reshape/__init__.py b/bigframes/core/reshape/__init__.py index 4a3bb16a39..ffbba10936 100644 --- a/bigframes/core/reshape/__init__.py +++ b/bigframes/core/reshape/__init__.py @@ -104,8 +104,7 @@ def concat( block_list = [obj._block for obj in objs] block = block_list[0] for rblock in block_list[1:]: - combined_index, _ = block.index.join(rblock.index, how=join) - block = combined_index._block + block, _ = block.join(rblock, how=join) return bigframes.dataframe.DataFrame(block) @@ -123,10 +122,14 @@ def cut( raise ValueError("`bins` should be a positive integer.") if isinstance(bins, Iterable): - if not isinstance(bins, pd.IntervalIndex): - bins = pd.IntervalIndex.from_tuples(list(bins)) - - if bins.is_overlapping: + if isinstance(bins, pd.IntervalIndex): + as_index: pd.IntervalIndex = bins + bins = tuple((bin.left.item(), bin.right.item()) for bin in bins) + else: + as_index = pd.IntervalIndex.from_tuples(list(bins)) + bins = tuple(bins) + + if as_index.is_overlapping: raise ValueError("Overlapping IntervalIndex is not accepted.") if labels is not None and labels is not False: @@ -149,6 +152,8 @@ def qcut( ) -> bigframes.series.Series: if isinstance(q, int) and q <= 0: raise ValueError("`q` should be a positive integer.") + if utils.is_list_like(q): + q = tuple(q) if labels is not False: raise NotImplementedError( @@ -163,7 +168,7 @@ def qcut( block, nullity_id = block.apply_unary_op(x._value_column, ops.notnull_op) block, result = block.apply_window_op( x._value_column, - agg_ops.QcutOp(q), + agg_ops.QcutOp(q), # type: ignore window_spec=core.WindowSpec( grouping_keys=(nullity_id,), ordering=(order.OrderingColumnReference(x._value_column),), diff --git a/bigframes/core/traversal.py b/bigframes/core/traversal.py new file mode 100644 index 0000000000..b038ee6599 --- /dev/null +++ b/bigframes/core/traversal.py @@ -0,0 +1,27 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bigframes.core.nodes as nodes + + +def is_trivially_executable(node: nodes.BigFrameNode) -> bool: + if local_only(node): + return True + children_trivial = all(is_trivially_executable(child) for child in node.child_nodes) + self_trivial = (not node.non_local) and (node.row_preserving) + return children_trivial and self_trivial + + +def local_only(node: nodes.BigFrameNode) -> bool: + return all(isinstance(node, nodes.ReadLocalNode) for node in node.roots) diff --git a/bigframes/core/window/__init__.py b/bigframes/core/window/__init__.py index 240715b6df..8711625f88 100644 --- a/bigframes/core/window/__init__.py +++ b/bigframes/core/window/__init__.py @@ -64,7 +64,7 @@ def min(self): def _apply_aggregate( self, - op: agg_ops.AggregateOp, + op: agg_ops.UnaryAggregateOp, ): block = self._block labels = [block.col_id_to_label[col] for col in self._value_column_ids] diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2a20a4aabb..2729d23701 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -140,22 +140,19 @@ def __init__( other_block = other._block.with_column_labels([key]) # Pandas will keep original sorting if all indices are aligned. # We cannot detect this easily however, and so always sort on index - result_index, _ = block.index.join( # type:ignore - other_block.index, how="outer", sort=True + block, _ = block.join( # type:ignore + other_block, how="outer", sort=True ) - block = result_index._block if block: if index is not None: bf_index = indexes.Index(index) idx_block = bf_index._block idx_cols = idx_block.index_columns - join_idx, (_, r_mapping) = block.reset_index().index.join( - bf_index._block.reset_index().index, how="inner" - ) - block = join_idx._block.set_index( - [r_mapping[idx_col] for idx_col in idx_cols] + block, (_, r_mapping) = block.reset_index().join( + bf_index._block.reset_index(), how="inner" ) + block = block.set_index([r_mapping[idx_col] for idx_col in idx_cols]) if columns: block = block.select_columns(list(columns)) # type:ignore if dtype: @@ -182,7 +179,7 @@ def __init__( if isinstance(dt, pandas.ArrowDtype) ) ): - self._block = blocks.block_from_local(pd_dataframe) + self._block = blocks.Block.from_local(pd_dataframe) elif session: self._block = session.read_pandas(pd_dataframe)._get_block() else: @@ -538,8 +535,8 @@ def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame: combined_index, ( get_column_left, get_column_right, - ) = self._block.index.join(key._block.index, how="left") - block = combined_index._block + ) = self._block.join(key._block, how="left") + block = combined_index filter_col_id = get_column_right[key._value_column] block = block.filter(filter_col_id) block = block.drop_columns([filter_col_id]) @@ -592,6 +589,8 @@ def __repr__(self) -> str: max_results = opts.max_rows if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) + + self._cached() # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? @@ -629,6 +628,8 @@ def _repr_html_(self) -> str: max_results = bigframes.options.display.max_rows if opts.repr_mode == "deferred": return formatter.repr_query_job_html(self.query_job) + + self._cached() # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? @@ -717,13 +718,12 @@ def _apply_series_binop( f"Row Series operations haven't been supported. {constants.FEEDBACK_LINK}" ) - joined_index, (get_column_left, get_column_right) = self._block.index.join( - other._block.index, how=how + block, (get_column_left, get_column_right) = self._block.join( + other._block, how=how ) series_column_id = other._value_column series_col = get_column_right[series_column_id] - block = joined_index._block for column_id, label in zip( self._block.value_columns, self._block.column_labels ): @@ -748,8 +748,8 @@ def _apply_dataframe_binop( reverse: bool = False, ) -> DataFrame: # Join rows - joined_index, (get_column_left, get_column_right) = self._block.index.join( - other._block.index, how=how + block, (get_column_left, get_column_right) = self._block.join( + other._block, how=how ) # join columns schema # indexers will be none for exact match @@ -758,7 +758,6 @@ def _apply_dataframe_binop( ) binop_result_ids = [] - block = joined_index._block column_indices = zip( lcol_indexer if (lcol_indexer is not None) else range(len(columns)), @@ -1198,12 +1197,10 @@ def drop( def _drop_by_index(self, index: indexes.Index) -> DataFrame: block = index._block block, ordering_col = block.promote_offsets() - joined_index, (get_column_left, get_column_right) = self._block.index.join( - block.index - ) + joined_index, (get_column_left, get_column_right) = self._block.join(block) new_ordering_col = get_column_right[ordering_col] - drop_block = joined_index._block + drop_block = joined_index drop_block, drop_col = drop_block.apply_unary_op( new_ordering_col, ops.isnull_op, @@ -1262,7 +1259,7 @@ def reorder_levels(self, order: LevelsType, axis: int | str = 0): raise ValueError("Columns must be a multiindex to reorder levels.") def _resolve_levels(self, level: LevelsType) -> typing.Sequence[str]: - return self._block.resolve_index_level(level) + return self._block.index.resolve_level(level) def rename(self, *, columns: Mapping[blocks.Label, blocks.Label]) -> DataFrame: block = self._block.rename(columns=columns) @@ -1345,16 +1342,16 @@ def _assign_single_item_listlike(self, k: str, v: Sequence) -> DataFrame: raise ValueError( "Assigning listlike to a first column under multiindex is not supported." ) - result_block = new_column_block.with_index_labels(self._block.index_labels) + result_block = new_column_block.with_index_labels(self._block.index.names) result_block = result_block.with_column_labels([k]) else: - result_index, (get_column_left, get_column_right,) = self_block.index.join( - new_column_block.index, how="left", block_identity_join=True - ) - result_block = result_index._block + result_block, ( + get_column_left, + get_column_right, + ) = self_block.join(new_column_block, how="left", block_identity_join=True) result_block = result_block.set_index( [get_column_left[col_id] for col_id in original_index_column_ids], - index_labels=self._block.index_labels, + index_labels=self._block.index.names, ) src_col = get_column_right[new_column_block.value_columns[0]] # Check to see if key exists, and modify in place @@ -1382,14 +1379,13 @@ def _assign_scalar(self, label: str, value: Union[int, float]) -> DataFrame: def _assign_series_join_on_index( self, label: str, series: bigframes.series.Series ) -> DataFrame: - joined_index, (get_column_left, get_column_right) = self._block.index.join( - series._block.index, how="left" + block, (get_column_left, get_column_right) = self._block.join( + series._block, how="left" ) column_ids = [ get_column_left[col_id] for col_id in self._block.cols_matching_label(label) ] - block = joined_index._block source_column = get_column_right[series._value_column] # Replace each column matching the label @@ -2307,10 +2303,8 @@ def join( return left._perform_join_by_index(right, how=how) def _perform_join_by_index(self, other: DataFrame, *, how: str = "left"): - combined_index, _ = self._block.index.join( - other._block.index, how=how, block_identity_join=True - ) - return DataFrame(combined_index._block) + block, _ = self._block.join(other._block, how=how, block_identity_join=True) + return DataFrame(block) def rolling(self, window: int, min_periods=None) -> bigframes.core.window.Window: # To get n size window, need current row and n-1 preceding rows. @@ -2380,17 +2374,14 @@ def _groupby_series( col_ids: typing.Sequence[str] = [] for key in by: if isinstance(key, bigframes.series.Series): - combined_index, ( + block, ( get_column_left, get_column_right, - ) = block.index.join( - key._block.index, how="inner" if dropna else "left" - ) + ) = block.join(key._block, how="inner" if dropna else "left") col_ids = [ *[get_column_left[value] for value in col_ids], get_column_right[key._value_column], ] - block = combined_index._block else: # Interpret as index level or column name col_matches = block.label_to_col_id.get(key, []) @@ -3100,8 +3091,12 @@ def _set_block(self, block: blocks.Block): def _get_block(self) -> blocks.Block: return self._block - def _cached(self) -> DataFrame: - self._set_block(self._block.cached()) + def _cached(self, *, force: bool = False) -> DataFrame: + """Materialize dataframe to a temporary table. + No-op if the dataframe represents a trivial transformation of an existing materialization. + Force=True is used for BQML integration where need to copy data rather than use snapshot. + """ + self._set_block(self._block.cached(force=force)) return self _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") diff --git a/bigframes/ml/core.py b/bigframes/ml/core.py index 7c156b4cb7..266ab1b058 100644 --- a/bigframes/ml/core.py +++ b/bigframes/ml/core.py @@ -247,9 +247,11 @@ def create_model( # Cache dataframes to make sure base table is not a snapshot # cached dataframe creates a full copy, never uses snapshot if y_train is None: - input_data = X_train._cached() + input_data = X_train._cached(force=True) else: - input_data = X_train._cached().join(y_train._cached(), how="outer") + input_data = X_train._cached(force=True).join( + y_train._cached(force=True), how="outer" + ) options.update({"INPUT_LABEL_COLS": y_train.columns.tolist()}) session = X_train._session @@ -281,7 +283,9 @@ def create_time_series_model( options = dict(options) # Cache dataframes to make sure base table is not a snapshot # cached dataframe creates a full copy, never uses snapshot - input_data = X_train._cached().join(y_train._cached(), how="outer") + input_data = X_train._cached(force=True).join( + y_train._cached(force=True), how="outer" + ) options.update({"TIME_SERIES_TIMESTAMP_COL": X_train.columns.tolist()[0]}) options.update({"TIME_SERIES_DATA_COL": y_train.columns.tolist()[0]}) diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index ba62ae28d2..aed05e287b 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -14,14 +14,13 @@ from __future__ import annotations +import abc +import dataclasses import typing - -from pandas import Int64Dtype -import pandas as pd - -import bigframes.dtypes as dtypes +from typing import ClassVar, Hashable, Optional, Tuple +@dataclasses.dataclass(frozen=True) class WindowOp: @property def skips_nulls(self): @@ -34,72 +33,110 @@ def handles_ties(self): return False +@dataclasses.dataclass(frozen=True) +class UnaryWindowOp(WindowOp): + @property + def arguments(self) -> int: + return 1 + + +@dataclasses.dataclass(frozen=True) class AggregateOp(WindowOp): - name = "abstract_aggregate" + """Aggregate ops can be applied with or without a window clause.""" + + @property + @abc.abstractmethod + def name(self) -> str: + ... + + @property + @abc.abstractmethod + def arguments(self) -> int: + ... + + +@dataclasses.dataclass(frozen=True) +class UnaryAggregateOp(AggregateOp, UnaryWindowOp): + @property + def arguments(self) -> int: + return 1 + + +@dataclasses.dataclass(frozen=True) +class BinaryAggregateOp(AggregateOp): + @property + def arguments(self) -> int: + return 2 + +@dataclasses.dataclass(frozen=True) +class SumOp(UnaryAggregateOp): + name: ClassVar[str] = "sum" -class SumOp(AggregateOp): - name = "sum" +@dataclasses.dataclass(frozen=True) +class MedianOp(UnaryAggregateOp): + name: ClassVar[str] = "median" -class MedianOp(AggregateOp): - name = "median" +@dataclasses.dataclass(frozen=True) +class ApproxQuartilesOp(UnaryAggregateOp): + quartile: int -class ApproxQuartilesOp(AggregateOp): - def __init__(self, quartile: int): - self.name = f"{quartile*25}%" - self._quartile = quartile + @property + def name(self): + return f"{self.quartile*25}%" -class MeanOp(AggregateOp): - name = "mean" +@dataclasses.dataclass(frozen=True) +class MeanOp(UnaryAggregateOp): + name: ClassVar[str] = "mean" -class ProductOp(AggregateOp): - name = "product" +@dataclasses.dataclass(frozen=True) +class ProductOp(UnaryAggregateOp): + name: ClassVar[str] = "product" -class MaxOp(AggregateOp): - name = "max" +@dataclasses.dataclass(frozen=True) +class MaxOp(UnaryAggregateOp): + name: ClassVar[str] = "max" -class MinOp(AggregateOp): - name = "min" +@dataclasses.dataclass(frozen=True) +class MinOp(UnaryAggregateOp): + name: ClassVar[str] = "min" -class StdOp(AggregateOp): - name = "std" +@dataclasses.dataclass(frozen=True) +class StdOp(UnaryAggregateOp): + name: ClassVar[str] = "std" -class VarOp(AggregateOp): - name = "var" +@dataclasses.dataclass(frozen=True) +class VarOp(UnaryAggregateOp): + name: ClassVar[str] = "var" -class PopVarOp(AggregateOp): - name = "popvar" +@dataclasses.dataclass(frozen=True) +class PopVarOp(UnaryAggregateOp): + name: ClassVar[str] = "popvar" -class CountOp(AggregateOp): - name = "count" +@dataclasses.dataclass(frozen=True) +class CountOp(UnaryAggregateOp): + name: ClassVar[str] = "count" @property def skips_nulls(self): return False -class CutOp(WindowOp): - def __init__(self, bins: typing.Union[int, pd.IntervalIndex], labels=None): - if isinstance(bins, int): - if not bins > 0: - raise ValueError("`bins` should be a positive integer.") - self._bins_int = bins - self._bins = dtypes.literal_to_ibis_scalar(bins, force_dtype=Int64Dtype()) - else: - self._bins_int = 0 - self._bins = bins - - self._labels = labels +@dataclasses.dataclass(frozen=True) +class CutOp(UnaryWindowOp): + # TODO: Unintuitive, refactor into multiple ops? + bins: typing.Union[int, Tuple[Tuple[Hashable, Hashable], ...]] + labels: Optional[bool] @property def skips_nulls(self): @@ -110,10 +147,13 @@ def handles_ties(self): return True -class QcutOp(WindowOp): - def __init__(self, quantiles: typing.Union[int, typing.Sequence[float]]): - self.name = f"qcut-{quantiles}" - self._quantiles = quantiles +@dataclasses.dataclass(frozen=True) +class QcutOp(UnaryWindowOp): + quantiles: typing.Union[int, typing.Tuple[float, ...]] + + @property + def name(self): + return f"qcut-{self.quantiles}" @property def skips_nulls(self): @@ -124,26 +164,29 @@ def handles_ties(self): return True -class NuniqueOp(AggregateOp): - name = "nunique" +@dataclasses.dataclass(frozen=True) +class NuniqueOp(UnaryAggregateOp): + name: ClassVar[str] = "nunique" @property def skips_nulls(self): return False -class AnyValueOp(AggregateOp): +@dataclasses.dataclass(frozen=True) +class AnyValueOp(UnaryAggregateOp): # Warning: only use if all values are equal. Non-deterministic otherwise. # Do not expose to users. For special cases only (e.g. pivot). - name = "any_value" + name: ClassVar[str] = "any_value" @property def skips_nulls(self): return True -class RankOp(WindowOp): - name = "rank" +@dataclasses.dataclass(frozen=True) +class RankOp(UnaryWindowOp): + name: ClassVar[str] = "rank" @property def skips_nulls(self): @@ -154,7 +197,8 @@ def handles_ties(self): return True -class DenseRankOp(WindowOp): +@dataclasses.dataclass(frozen=True) +class DenseRankOp(UnaryWindowOp): @property def skips_nulls(self): return False @@ -164,50 +208,61 @@ def handles_ties(self): return True -class FirstOp(WindowOp): - name = "first" +@dataclasses.dataclass(frozen=True) +class FirstOp(UnaryWindowOp): + name: ClassVar[str] = "first" -class FirstNonNullOp(WindowOp): +@dataclasses.dataclass(frozen=True) +class FirstNonNullOp(UnaryWindowOp): @property def skips_nulls(self): return False -class LastOp(WindowOp): - name = "last" +@dataclasses.dataclass(frozen=True) +class LastOp(UnaryWindowOp): + name: ClassVar[str] = "last" -class LastNonNullOp(WindowOp): +@dataclasses.dataclass(frozen=True) +class LastNonNullOp(UnaryWindowOp): @property def skips_nulls(self): return False -class ShiftOp(WindowOp): - def __init__(self, periods: int): - self._periods = periods +@dataclasses.dataclass(frozen=True) +class ShiftOp(UnaryWindowOp): + periods: int @property def skips_nulls(self): return False -class DiffOp(WindowOp): - def __init__(self, periods: int): - self._periods = periods +@dataclasses.dataclass(frozen=True) +class DiffOp(UnaryWindowOp): + periods: int @property def skips_nulls(self): return False -class AllOp(AggregateOp): - name = "all" +@dataclasses.dataclass(frozen=True) +class AllOp(UnaryAggregateOp): + name: ClassVar[str] = "all" + + +@dataclasses.dataclass(frozen=True) +class AnyOp(UnaryAggregateOp): + name: ClassVar[str] = "any" -class AnyOp(AggregateOp): - name = "any" +@dataclasses.dataclass(frozen=True) +class CorrOp(BinaryAggregateOp): + name: ClassVar[str] = "corr" sum_op = SumOp() @@ -228,7 +283,7 @@ class AnyOp(AggregateOp): # TODO: Alternative names and lookup from numpy function objects -_AGGREGATIONS_LOOKUP: dict[str, AggregateOp] = { +_AGGREGATIONS_LOOKUP: dict[str, UnaryAggregateOp] = { op.name: op for op in [ sum_op, @@ -250,7 +305,7 @@ class AnyOp(AggregateOp): } -def lookup_agg_func(key: str) -> AggregateOp: +def lookup_agg_func(key: str) -> UnaryAggregateOp: if callable(key): raise NotImplementedError( "Aggregating with callable object not supported, pass method name as string instead (eg. 'sum' instead of np.sum)." diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index 6829d3faab..4aad9479e7 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -65,8 +65,8 @@ def __init__( bf_index = indexes.Index(index) idx_block = bf_index._block idx_cols = idx_block.value_columns - block_idx, _ = idx_block.index.join(block.index, how="left") - block = block_idx._block.with_index_labels(bf_index.names) + block_idx, _ = idx_block.join(block, how="left") + block = block_idx.with_index_labels(bf_index.names) elif isinstance(data, indexes.Index): if data.nlevels != 1: @@ -78,10 +78,8 @@ def __init__( bf_index = indexes.Index(index) idx_block = bf_index._block.reset_index(drop=False) idx_cols = idx_block.value_columns - block_idx, (l_mapping, _) = idx_block.index.join( - block.index, how="left" - ) - block = block_idx._block.set_index([l_mapping[col] for col in idx_cols]) + block, (l_mapping, _) = idx_block.join(block, how="left") + block = block.set_index([l_mapping[col] for col in idx_cols]) block = block.with_index_labels(bf_index.names) if block: @@ -114,7 +112,7 @@ def __init__( if isinstance(dt, pd.ArrowDtype) ) ): - block = blocks.block_from_local(pd_dataframe) + block = blocks.Block.from_local(pd_dataframe) elif session: block = session.read_pandas(pd_dataframe)._get_block() else: @@ -214,15 +212,14 @@ def _align_n( block = self._block for other in others: if isinstance(other, series.Series): - combined_index, ( + block, ( get_column_left, get_column_right, - ) = block.index.join(other._block.index, how=how) + ) = block.join(other._block, how=how) value_ids = [ *[get_column_left[value] for value in value_ids], get_column_right[other._value_column], ] - block = combined_index._block else: # Will throw if can't interpret as scalar. dtype = typing.cast(bigframes.dtypes.Dtype, self._dtype) diff --git a/bigframes/series.py b/bigframes/series.py index 6a21727975..6167ce0966 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -183,7 +183,7 @@ def rename( block = self._block for k, v in index.items(): new_idx_ids = [] - for idx_id, idx_dtype in zip(block.index_columns, block.index_dtypes): + for idx_id, idx_dtype in zip(block.index_columns, block.index.dtypes): # Will throw if key type isn't compatible with index type, which leads to invalid SQL. block.create_constant(k, dtype=idx_dtype) @@ -199,7 +199,7 @@ def rename( new_idx_ids.append(new_idx_id) block = block.drop_columns([const_id, cond_id]) - block = block.set_index(new_idx_ids, index_labels=block.index_labels) + block = block.set_index(new_idx_ids, index_labels=block.index.names) return Series(block) @@ -259,6 +259,7 @@ def __repr__(self) -> str: if opts.repr_mode == "deferred": return formatter.repr_query_job(self.query_job) + self._cached() pandas_df, _, query_job = self._block.retrieve_repr_request_results(max_results) self._set_internal_query_job(query_job) @@ -368,7 +369,7 @@ def reorder_levels(self, order: LevelsType, axis: int | str = 0): return Series(self._block.reorder_levels(resolved_level_ids)) def _resolve_levels(self, level: LevelsType) -> typing.Sequence[str]: - return self._block.resolve_index_level(level) + return self._block.index.resolve_level(level) def between(self, left, right, inclusive="both"): if inclusive not in ["both", "neither", "left", "right"]: @@ -1044,7 +1045,7 @@ def _align3(self, other1: Series | scalars.Scalar, other2: Series | scalars.Scal values, index = self._align_n([other1, other2], how) return (values[0], values[1], values[2], index) - def _apply_aggregation(self, op: agg_ops.AggregateOp) -> Any: + def _apply_aggregation(self, op: agg_ops.UnaryAggregateOp) -> Any: return self._block.get_stat(self._value_column, op) def _apply_window_op( @@ -1179,19 +1180,16 @@ def _groupby_values( value_col = self._value_column for key in by: if isinstance(key, Series): - combined_index, ( + block, ( get_column_left, get_column_right, - ) = block.index.join( - key._block.index, how="inner" if dropna else "left" - ) + ) = block.join(key._block, how="inner" if dropna else "left") value_col = get_column_left[self._value_column] grouping_cols = [ *[get_column_left[value] for value in grouping_cols], get_column_right[key._value_column], ] - block = combined_index._block else: # Interpret as index level matches = block.index_name_to_col_id.get(key, []) @@ -1521,8 +1519,8 @@ def _slice( ), ) - def _cached(self) -> Series: - self._set_block(self._block.cached()) + def _cached(self, *, force: bool = True) -> Series: + self._set_block(self._block.cached(force=force)) return self diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index bd813c8c6b..15d4b3577b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -71,6 +71,7 @@ import bigframes.core.guid as guid from bigframes.core.ordering import IntegerEncoding, OrderingColumnReference import bigframes.core.ordering as orderings +import bigframes.core.traversal as traversals import bigframes.core.utils as utils import bigframes.dataframe as dataframe import bigframes.formatting_helpers as formatting_helpers @@ -1475,7 +1476,7 @@ def _start_query( results_iterator = query_job.result(max_results=max_results) return results_iterator, query_job - def _execute_and_cache( + def _cache_with_cluster_cols( self, array_value: core.ArrayValue, cluster_cols: typing.Sequence[str] ) -> core.ArrayValue: """Executes the query and uses the resulting table to rewrite future executions.""" @@ -1506,6 +1507,41 @@ def _execute_and_cache( ordering=compiled_value._ordering, ) + def _cache_with_offsets(self, array_value: core.ArrayValue) -> core.ArrayValue: + """Executes the query and uses the resulting table to rewrite future executions.""" + # TODO: Use this for all executions? Problem is that caching materializes extra + # ordering columns + compiled_value = self._compile_ordered(array_value) + + ibis_expr = compiled_value._to_ibis_expr( + ordering_mode="offset_col", order_col_name="bigframes_offsets" + ) + tmp_table = self._ibis_to_temp_table( + ibis_expr, cluster_cols=["bigframes_offsets"], api_name="cached" + ) + table_expression = self.ibis_client.table( + f"{tmp_table.project}.{tmp_table.dataset_id}.{tmp_table.table_id}" + ) + new_columns = [table_expression[column] for column in compiled_value.column_ids] + new_hidden_columns = [table_expression["bigframes_offsets"]] + # TODO: Instead, keep session-wide map of cached results and automatically reuse + return core.ArrayValue.from_ibis( + self, + table_expression, + columns=new_columns, + hidden_ordering_columns=new_hidden_columns, + ordering=orderings.ExpressionOrdering.from_offset_col("bigframes_offsets"), + ) + + def _is_trivially_executable(self, array_value: core.ArrayValue): + """ + Can the block be evaluated very cheaply? + If True, the array_value probably is not worth caching. + """ + # Once rewriting is available, will want to rewrite before + # evaluating execution cost. + return traversals.is_trivially_executable(array_value.node) + def _execute( self, array_value: core.ArrayValue, diff --git a/bigframes/version.py b/bigframes/version.py index 131f820e7d..fca3eec023 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.20.0" +__version__ = "0.20.1" diff --git a/samples/snippets/bqml_getting_started_test.py b/samples/snippets/bqml_getting_started_test.py index 783f963feb..bb282fa563 100644 --- a/samples/snippets/bqml_getting_started_test.py +++ b/samples/snippets/bqml_getting_started_test.py @@ -91,3 +91,78 @@ def test_bqml_getting_started(random_model_id): replace=True, ) # [END bigquery_dataframes_bqml_getting_started_tutorial] + + # [START bigquery_dataframes_bqml_getting_started_tutorial_evaluate] + import bigframes.pandas as bpd + + # Select model you'll use for training. `read_gbq_model` loads model data from a + # BigQuery, but you could also use the `model` object from the previous steps. + model = bpd.read_gbq_model( + your_model_id, # For example: "bqml_tutorial.sample_model", + ) + + # The WHERE clause — _TABLE_SUFFIX BETWEEN '20170701' AND '20170801' — + # limits the number of tables scanned by the query. The date range scanned is + # July 1, 2017 to August 1, 2017. This is the data you're using to evaluate the predictive performance + # of the model. It was collected in the month immediately following the time + # period spanned by the training data. + + df = bpd.read_gbq( + """ + SELECT GENERATE_UUID() AS rowindex, * + FROM + `bigquery-public-data.google_analytics_sample.ga_sessions_*` + WHERE + _TABLE_SUFFIX BETWEEN '20170701' AND '20170801' + """, + index_col="rowindex", + ) + transactions = df["totals"].struct.field("transactions") + label = transactions.notnull().map({True: 1, False: 0}) + operatingSystem = df["device"].struct.field("operatingSystem") + operatingSystem = operatingSystem.fillna("") + isMobile = df["device"].struct.field("isMobile") + country = df["geoNetwork"].struct.field("country").fillna("") + pageviews = df["totals"].struct.field("pageviews").fillna(0) + features = bpd.DataFrame( + { + "os": operatingSystem, + "is_mobile": isMobile, + "country": country, + "pageviews": pageviews, + } + ) + + # Some models include a convenient .score(X, y) method for evaluation with a preset accuracy metric: + + # Because you performed a logistic regression, the results include the following columns: + + # - precision — A metric for classification models. Precision identifies the frequency with + # which a model was correct when predicting the positive class. + + # - recall — A metric for classification models that answers the following question: + # Out of all the possible positive labels, how many did the model correctly identify? + + # - accuracy — Accuracy is the fraction of predictions that a classification model got right. + + # - f1_score — A measure of the accuracy of the model. The f1 score is the harmonic average of + # the precision and recall. An f1 score's best value is 1. The worst value is 0. + + # - log_loss — The loss function used in a logistic regression. This is the measure of how far the + # model's predictions are from the correct labels. + + # - roc_auc — The area under the ROC curve. This is the probability that a classifier is more confident that + # a randomly chosen positive example + # is actually positive than that a randomly chosen negative example is positive. For more information, + # see ['Classification']('https://developers.google.com/machine-learning/crash-course/classification/video-lecture') + # in the Machine Learning Crash Course. + + model.score(features, label) + # precision recall accuracy f1_score log_loss roc_auc + # 0 0.412621 0.079143 0.985074 0.132812 0.049764 0.974285 + # [1 rows x 6 columns] + # [END bigquery_dataframes_bqml_getting_started_tutorial_evaluate] + + # [START bigquery_dataframes_bqml_getting_started_tutorial_predict] + + # [END bigquery_dataframes_bqml_getting_started_tutorial_predict] diff --git a/tests/data/nested.jsonl b/tests/data/nested.jsonl new file mode 100644 index 0000000000..a71e9b1db1 --- /dev/null +++ b/tests/data/nested.jsonl @@ -0,0 +1,100 @@ +{"rowindex":0,"customer_id":"jkl","day":"2023-12-18","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-18 03:43:58","data":[{"key":"x","value":20.2533015856},{"key":"y","value":42.8363462389}]},{"category":"D","timestamp":"2023-12-18 07:15:37","data":[{"key":"x","value":62.0762664928},{"key":"z","value":83.6655402432}]}]} +{"rowindex":1,"customer_id":"def","day":"2023-12-18","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-18 23:11:11","data":[{"key":"w","value":36.1388065179}]},{"category":"B","timestamp":"2023-12-18 07:12:50","data":[{"key":"z","value":68.7673488304}]},{"category":"D","timestamp":"2023-12-18 09:09:03","data":[{"key":"x","value":57.4139647019}]},{"category":"C","timestamp":"2023-12-18 13:05:30","data":[{"key":"z","value":36.087871201}]}]} +{"rowindex":2,"customer_id":"abc","day":"2023-12-6","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-06 10:37:11","data":[]},{"category":"A","timestamp":"2023-12-06 03:35:44","data":[]},{"category":"D","timestamp":"2023-12-06 13:10:57","data":[{"key":"z","value":21.8487807658}]},{"category":"B","timestamp":"2023-12-06 01:39:16","data":[{"key":"y","value":1.6380505139}]}]} +{"rowindex":3,"customer_id":"mno","day":"2023-12-16","flag":2,"event_sequence":[]} +{"rowindex":4,"customer_id":"jkl","day":"2023-12-1","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-01 22:29:35","data":[]}]} +{"rowindex":5,"customer_id":"mno","day":"2023-12-8","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-08 19:56:43","data":[{"key":"z","value":64.0025360397}]},{"category":"A","timestamp":"2023-12-08 00:43:53","data":[{"key":"z","value":62.5030923507},{"key":"y","value":67.4517590972}]}]} +{"rowindex":6,"customer_id":"abc","day":"2023-12-3","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-03 10:04:48","data":[{"key":"x","value":73.0494929425},{"key":"z","value":81.1761568104}]}]} +{"rowindex":7,"customer_id":"abc","day":"2023-12-6","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-06 16:50:15","data":[{"key":"w","value":46.395435162},{"key":"y","value":7.8421775851}]},{"category":"A","timestamp":"2023-12-06 05:55:01","data":[]},{"category":"B","timestamp":"2023-12-06 15:24:08","data":[{"key":"x","value":37.5351196265},{"key":"w","value":65.4896295524}]}]} +{"rowindex":8,"customer_id":"jkl","day":"2023-12-8","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-08 00:21:23","data":[{"key":"w","value":42.4467608939},{"key":"x","value":81.083558253}]},{"category":"A","timestamp":"2023-12-08 09:31:05","data":[]},{"category":"C","timestamp":"2023-12-08 01:42:37","data":[{"key":"y","value":55.1881250973}]},{"category":"C","timestamp":"2023-12-08 21:14:46","data":[{"key":"z","value":12.0833253151}]},{"category":"D","timestamp":"2023-12-08 21:38:25","data":[{"key":"y","value":59.9482432021}]}]} +{"rowindex":9,"customer_id":"jkl","day":"2023-12-5","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-05 09:46:09","data":[{"key":"w","value":48.5204042398}]},{"category":"C","timestamp":"2023-12-05 03:44:30","data":[{"key":"y","value":49.3712140658}]}]} +{"rowindex":10,"customer_id":"mno","day":"2023-12-1","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-01 00:53:03","data":[{"key":"w","value":19.1753301515},{"key":"z","value":90.1966084522}]},{"category":"B","timestamp":"2023-12-01 15:18:15","data":[{"key":"w","value":28.4831052842},{"key":"y","value":74.3676328239}]},{"category":"D","timestamp":"2023-12-01 18:35:06","data":[{"key":"w","value":50.9000130431}]},{"category":"A","timestamp":"2023-12-01 19:10:15","data":[{"key":"x","value":36.4073472229},{"key":"y","value":2.5800142072}]}]} +{"rowindex":11,"customer_id":"abc","day":"2023-12-7","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-07 03:28:37","data":[]},{"category":"D","timestamp":"2023-12-07 03:00:47","data":[{"key":"z","value":42.5078083149},{"key":"w","value":0.3430387149}]}]} +{"rowindex":12,"customer_id":"jkl","day":"2023-12-16","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-16 20:28:48","data":[{"key":"y","value":99.4511527722}]}]} +{"rowindex":13,"customer_id":"ghi","day":"2023-12-18","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-18 00:35:24","data":[{"key":"w","value":30.3520969504}]},{"category":"B","timestamp":"2023-12-18 10:45:35","data":[]},{"category":"C","timestamp":"2023-12-18 18:39:11","data":[{"key":"z","value":93.486287241}]},{"category":"C","timestamp":"2023-12-18 18:55:30","data":[{"key":"y","value":20.2247125873}]}]} +{"rowindex":14,"customer_id":"abc","day":"2023-12-14","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-14 04:48:13","data":[]},{"category":"B","timestamp":"2023-12-14 07:39:40","data":[]},{"category":"D","timestamp":"2023-12-14 22:08:13","data":[{"key":"x","value":31.3054147446},{"key":"y","value":32.9881809276}]},{"category":"A","timestamp":"2023-12-14 23:02:18","data":[{"key":"x","value":41.4514710087},{"key":"w","value":71.0759384863}]}]} +{"rowindex":15,"customer_id":"def","day":"2023-12-14","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-14 18:34:07","data":[{"key":"w","value":82.4015077053},{"key":"x","value":80.8508070787}]},{"category":"B","timestamp":"2023-12-14 10:08:52","data":[{"key":"y","value":91.3558143519},{"key":"w","value":42.8103570355}]}]} +{"rowindex":16,"customer_id":"mno","day":"2023-12-7","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-07 07:07:38","data":[]},{"category":"A","timestamp":"2023-12-07 03:39:27","data":[{"key":"w","value":25.6141348288}]}]} +{"rowindex":17,"customer_id":"mno","day":"2023-12-18","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-18 22:24:48","data":[{"key":"y","value":81.207759202}]}]} +{"rowindex":18,"customer_id":"ghi","day":"2023-12-13","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-13 16:26:05","data":[{"key":"y","value":30.6921921236}]},{"category":"C","timestamp":"2023-12-13 15:00:10","data":[{"key":"x","value":73.8609954622}]}]} +{"rowindex":19,"customer_id":"abc","day":"2023-12-7","flag":2,"event_sequence":[]} +{"rowindex":20,"customer_id":"jkl","day":"2023-12-17","flag":0,"event_sequence":[]} +{"rowindex":21,"customer_id":"mno","day":"2023-12-14","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-14 04:13:58","data":[{"key":"w","value":86.1548312989}]},{"category":"D","timestamp":"2023-12-14 15:39:43","data":[{"key":"w","value":40.0214161212}]},{"category":"B","timestamp":"2023-12-14 19:35:33","data":[{"key":"z","value":67.4152417129}]},{"category":"D","timestamp":"2023-12-14 17:20:20","data":[]},{"category":"C","timestamp":"2023-12-14 00:10:29","data":[{"key":"z","value":56.6529579965},{"key":"y","value":52.1273353535}]}]} +{"rowindex":22,"customer_id":"mno","day":"2023-12-8","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-08 13:34:42","data":[{"key":"w","value":95.9950956489},{"key":"y","value":73.9478601628}]}]} +{"rowindex":23,"customer_id":"def","day":"2023-12-17","flag":1,"event_sequence":[{"category":"A","timestamp":"2023-12-17 10:07:16","data":[{"key":"x","value":66.1044798274}]},{"category":"B","timestamp":"2023-12-17 14:33:42","data":[{"key":"z","value":77.4267396836}]},{"category":"B","timestamp":"2023-12-17 11:54:45","data":[]}]} +{"rowindex":24,"customer_id":"def","day":"2023-12-17","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-17 21:02:08","data":[{"key":"y","value":70.9945354474}]}]} +{"rowindex":25,"customer_id":"ghi","day":"2023-12-2","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-02 01:17:39","data":[]},{"category":"B","timestamp":"2023-12-02 13:54:33","data":[{"key":"w","value":49.7485944905},{"key":"x","value":12.3938168348}]},{"category":"B","timestamp":"2023-12-02 02:30:14","data":[]},{"category":"C","timestamp":"2023-12-02 13:16:54","data":[{"key":"x","value":52.0455905555},{"key":"y","value":13.1107332474}]},{"category":"A","timestamp":"2023-12-02 23:10:23","data":[{"key":"w","value":73.5827155332}]}]} +{"rowindex":26,"customer_id":"def","day":"2023-12-1","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-01 10:01:13","data":[]}]} +{"rowindex":27,"customer_id":"mno","day":"2023-12-10","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-10 11:07:58","data":[{"key":"y","value":41.8327013256},{"key":"w","value":59.4445826737}]},{"category":"C","timestamp":"2023-12-10 01:35:25","data":[{"key":"z","value":98.4395840749}]}]} +{"rowindex":28,"customer_id":"def","day":"2023-12-4","flag":0,"event_sequence":[{"category":"B","timestamp":"2023-12-04 13:27:56","data":[]},{"category":"D","timestamp":"2023-12-04 07:29:29","data":[]},{"category":"C","timestamp":"2023-12-04 15:50:42","data":[]},{"category":"C","timestamp":"2023-12-04 21:14:39","data":[{"key":"x","value":87.2090409333},{"key":"z","value":67.873124445}]},{"category":"A","timestamp":"2023-12-04 10:22:07","data":[]}]} +{"rowindex":29,"customer_id":"abc","day":"2023-12-6","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-06 13:03:19","data":[{"key":"y","value":64.2584716378},{"key":"w","value":17.4653120122}]},{"category":"A","timestamp":"2023-12-06 06:10:03","data":[{"key":"w","value":93.696003482},{"key":"y","value":0.675474038}]},{"category":"B","timestamp":"2023-12-06 10:10:08","data":[]},{"category":"C","timestamp":"2023-12-06 06:48:30","data":[]},{"category":"B","timestamp":"2023-12-06 23:00:42","data":[{"key":"x","value":65.1766190228}]}]} +{"rowindex":30,"customer_id":"abc","day":"2023-12-1","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-01 03:17:48","data":[]},{"category":"A","timestamp":"2023-12-01 19:59:32","data":[]},{"category":"C","timestamp":"2023-12-01 02:16:52","data":[]}]} +{"rowindex":31,"customer_id":"jkl","day":"2023-12-2","flag":0,"event_sequence":[{"category":"A","timestamp":"2023-12-02 13:13:21","data":[{"key":"y","value":85.6919195342}]},{"category":"C","timestamp":"2023-12-02 06:32:12","data":[{"key":"y","value":72.2526437761},{"key":"x","value":62.1668944755}]},{"category":"D","timestamp":"2023-12-02 01:49:25","data":[{"key":"z","value":13.5820871569}]},{"category":"A","timestamp":"2023-12-02 21:30:07","data":[{"key":"x","value":33.6063239173},{"key":"z","value":93.896859174}]},{"category":"C","timestamp":"2023-12-02 07:03:10","data":[{"key":"w","value":95.2222323306},{"key":"x","value":8.4438153156}]}]} +{"rowindex":32,"customer_id":"def","day":"2023-12-9","flag":1,"event_sequence":[]} +{"rowindex":33,"customer_id":"def","day":"2023-12-4","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-04 18:03:13","data":[{"key":"x","value":87.3759936085}]},{"category":"C","timestamp":"2023-12-04 12:23:33","data":[{"key":"x","value":7.6663438235}]},{"category":"D","timestamp":"2023-12-04 23:16:12","data":[{"key":"x","value":42.6682526335}]}]} +{"rowindex":34,"customer_id":"ghi","day":"2023-12-2","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-02 22:48:04","data":[]},{"category":"A","timestamp":"2023-12-02 06:52:49","data":[{"key":"x","value":53.7008853605}]},{"category":"D","timestamp":"2023-12-02 21:35:43","data":[{"key":"w","value":65.7972882681}]},{"category":"D","timestamp":"2023-12-02 04:22:32","data":[{"key":"x","value":8.0812633272}]},{"category":"D","timestamp":"2023-12-02 04:53:36","data":[]}]} +{"rowindex":35,"customer_id":"ghi","day":"2023-12-18","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-18 07:51:07","data":[]},{"category":"C","timestamp":"2023-12-18 23:09:23","data":[{"key":"x","value":36.7126625188},{"key":"z","value":7.3234058497}]}]} +{"rowindex":36,"customer_id":"ghi","day":"2023-12-11","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-11 17:36:44","data":[{"key":"y","value":72.5462499934},{"key":"x","value":40.7042156894}]},{"category":"C","timestamp":"2023-12-11 19:58:01","data":[{"key":"x","value":88.553115143},{"key":"w","value":16.5083749137}]},{"category":"C","timestamp":"2023-12-11 00:22:58","data":[{"key":"y","value":13.7684351079}]},{"category":"A","timestamp":"2023-12-11 06:52:46","data":[{"key":"x","value":82.6970048317}]}]} +{"rowindex":37,"customer_id":"jkl","day":"2023-12-1","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-01 12:41:41","data":[]},{"category":"D","timestamp":"2023-12-01 05:37:51","data":[]},{"category":"C","timestamp":"2023-12-01 07:50:54","data":[{"key":"y","value":79.7821140254},{"key":"w","value":55.1183743775}]},{"category":"A","timestamp":"2023-12-01 16:23:25","data":[]}]} +{"rowindex":38,"customer_id":"abc","day":"2023-12-15","flag":0,"event_sequence":[{"category":"B","timestamp":"2023-12-15 15:45:21","data":[]},{"category":"D","timestamp":"2023-12-15 05:40:05","data":[{"key":"z","value":84.4372711239}]},{"category":"C","timestamp":"2023-12-15 18:54:07","data":[]},{"category":"C","timestamp":"2023-12-15 01:34:35","data":[{"key":"x","value":57.6043137776},{"key":"y","value":2.0915421039}]}]} +{"rowindex":39,"customer_id":"ghi","day":"2023-12-2","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-02 17:31:07","data":[]},{"category":"A","timestamp":"2023-12-02 14:09:19","data":[]},{"category":"A","timestamp":"2023-12-02 19:47:26","data":[{"key":"y","value":40.4981578761}]}]} +{"rowindex":40,"customer_id":"abc","day":"2023-12-17","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-17 14:54:26","data":[]}]} +{"rowindex":41,"customer_id":"def","day":"2023-12-8","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-08 03:29:13","data":[{"key":"w","value":22.3551385464}]},{"category":"A","timestamp":"2023-12-08 18:11:55","data":[]}]} +{"rowindex":42,"customer_id":"ghi","day":"2023-12-2","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-02 13:34:00","data":[{"key":"w","value":32.2914731904},{"key":"z","value":1.667821995}]},{"category":"C","timestamp":"2023-12-02 16:27:30","data":[]},{"category":"D","timestamp":"2023-12-02 05:53:11","data":[]},{"category":"C","timestamp":"2023-12-02 06:36:55","data":[{"key":"z","value":17.1648556861},{"key":"y","value":68.34850499}]}]} +{"rowindex":43,"customer_id":"ghi","day":"2023-12-11","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-11 08:23:53","data":[{"key":"x","value":44.2005886027}]},{"category":"B","timestamp":"2023-12-11 07:45:41","data":[{"key":"w","value":77.6941452877},{"key":"z","value":51.1968046092}]},{"category":"B","timestamp":"2023-12-11 11:58:25","data":[{"key":"y","value":68.1363704094}]},{"category":"C","timestamp":"2023-12-11 22:13:57","data":[{"key":"z","value":58.1763854177}]},{"category":"C","timestamp":"2023-12-11 09:13:08","data":[]}]} +{"rowindex":44,"customer_id":"def","day":"2023-12-12","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-12 11:38:27","data":[{"key":"y","value":89.3301425129},{"key":"w","value":39.419946238}]}]} +{"rowindex":45,"customer_id":"mno","day":"2023-12-14","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-14 13:26:53","data":[{"key":"z","value":76.4355996198}]},{"category":"D","timestamp":"2023-12-14 02:51:25","data":[]},{"category":"D","timestamp":"2023-12-14 16:06:20","data":[]}]} +{"rowindex":46,"customer_id":"mno","day":"2023-12-18","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-18 16:52:35","data":[{"key":"y","value":92.8314533492}]},{"category":"A","timestamp":"2023-12-18 18:55:16","data":[]},{"category":"A","timestamp":"2023-12-18 11:48:11","data":[]}]} +{"rowindex":47,"customer_id":"ghi","day":"2023-12-5","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-05 18:00:29","data":[{"key":"w","value":4.1194443596},{"key":"y","value":90.9907980881}]},{"category":"C","timestamp":"2023-12-05 18:28:30","data":[]},{"category":"C","timestamp":"2023-12-05 01:23:53","data":[]},{"category":"B","timestamp":"2023-12-05 09:30:53","data":[]}]} +{"rowindex":48,"customer_id":"jkl","day":"2023-12-4","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-04 00:00:57","data":[{"key":"x","value":54.1860622721},{"key":"z","value":21.9039040875}]},{"category":"C","timestamp":"2023-12-04 03:47:29","data":[{"key":"z","value":10.1626962952},{"key":"y","value":80.2137857017}]},{"category":"C","timestamp":"2023-12-04 09:38:59","data":[{"key":"y","value":41.4002343854},{"key":"x","value":2.5915025309}]},{"category":"D","timestamp":"2023-12-04 10:26:10","data":[{"key":"y","value":78.3790791291},{"key":"z","value":21.0205345948}]}]} +{"rowindex":49,"customer_id":"jkl","day":"2023-12-11","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-11 00:56:27","data":[]},{"category":"C","timestamp":"2023-12-11 00:00:49","data":[]},{"category":"A","timestamp":"2023-12-11 06:51:01","data":[]},{"category":"B","timestamp":"2023-12-11 15:03:31","data":[{"key":"w","value":11.4068443366}]},{"category":"A","timestamp":"2023-12-11 06:51:26","data":[{"key":"x","value":16.6716464506},{"key":"w","value":12.3375298466}]}]} +{"rowindex":50,"customer_id":"jkl","day":"2023-12-7","flag":0,"event_sequence":[]} +{"rowindex":51,"customer_id":"jkl","day":"2023-12-16","flag":1,"event_sequence":[]} +{"rowindex":52,"customer_id":"mno","day":"2023-12-8","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-08 13:38:34","data":[{"key":"y","value":89.16823262}]},{"category":"B","timestamp":"2023-12-08 21:42:37","data":[{"key":"z","value":49.2264719354},{"key":"w","value":71.3471924749}]},{"category":"B","timestamp":"2023-12-08 11:20:22","data":[]}]} +{"rowindex":53,"customer_id":"ghi","day":"2023-12-18","flag":0,"event_sequence":[]} +{"rowindex":54,"customer_id":"def","day":"2023-12-14","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-14 15:18:52","data":[{"key":"x","value":10.7255724898}]},{"category":"C","timestamp":"2023-12-14 00:16:13","data":[{"key":"x","value":81.6578442509},{"key":"z","value":97.6343706241}]},{"category":"A","timestamp":"2023-12-14 15:17:47","data":[{"key":"z","value":61.0727156569},{"key":"y","value":68.5047229429}]}]} +{"rowindex":55,"customer_id":"def","day":"2023-12-17","flag":0,"event_sequence":[{"category":"A","timestamp":"2023-12-17 08:09:37","data":[{"key":"x","value":96.7880530276}]},{"category":"C","timestamp":"2023-12-17 17:45:03","data":[{"key":"x","value":89.261752039}]},{"category":"B","timestamp":"2023-12-17 23:34:55","data":[{"key":"x","value":56.6947696032},{"key":"y","value":39.2160698568}]}]} +{"rowindex":56,"customer_id":"abc","day":"2023-12-3","flag":1,"event_sequence":[{"category":"C","timestamp":"2023-12-03 16:36:33","data":[{"key":"w","value":31.3842474288},{"key":"y","value":70.0883222713}]},{"category":"A","timestamp":"2023-12-03 23:14:03","data":[{"key":"z","value":2.241181478},{"key":"x","value":33.4155024672}]},{"category":"C","timestamp":"2023-12-03 02:59:20","data":[{"key":"w","value":30.325598456},{"key":"y","value":43.6801994079}]},{"category":"A","timestamp":"2023-12-03 17:25:12","data":[]}]} +{"rowindex":57,"customer_id":"jkl","day":"2023-12-18","flag":0,"event_sequence":[{"category":"B","timestamp":"2023-12-18 02:36:06","data":[{"key":"y","value":59.5978119693},{"key":"w","value":50.0596752663}]},{"category":"A","timestamp":"2023-12-18 22:15:26","data":[{"key":"y","value":46.7811589523},{"key":"z","value":17.5305458954}]},{"category":"B","timestamp":"2023-12-18 10:46:35","data":[{"key":"y","value":17.5499211188}]}]} +{"rowindex":58,"customer_id":"jkl","day":"2023-12-11","flag":1,"event_sequence":[{"category":"A","timestamp":"2023-12-11 08:08:24","data":[]},{"category":"A","timestamp":"2023-12-11 14:37:12","data":[{"key":"z","value":85.2678327892}]},{"category":"A","timestamp":"2023-12-11 14:11:26","data":[]},{"category":"A","timestamp":"2023-12-11 09:15:19","data":[]},{"category":"A","timestamp":"2023-12-11 13:29:27","data":[]}]} +{"rowindex":59,"customer_id":"mno","day":"2023-12-18","flag":2,"event_sequence":[]} +{"rowindex":60,"customer_id":"def","day":"2023-12-15","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-15 22:31:56","data":[{"key":"x","value":69.3286635086},{"key":"z","value":41.2999550449}]},{"category":"D","timestamp":"2023-12-15 22:30:05","data":[]},{"category":"B","timestamp":"2023-12-15 13:52:17","data":[{"key":"z","value":37.8991532333},{"key":"y","value":69.1381526165}]}]} +{"rowindex":61,"customer_id":"jkl","day":"2023-12-6","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-06 16:19:12","data":[{"key":"w","value":83.7533903572},{"key":"x","value":72.0796689391}]}]} +{"rowindex":62,"customer_id":"ghi","day":"2023-12-13","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-13 19:35:45","data":[{"key":"y","value":9.7338091747}]},{"category":"B","timestamp":"2023-12-13 04:27:13","data":[{"key":"x","value":77.5851696223},{"key":"y","value":44.6396928116}]},{"category":"B","timestamp":"2023-12-13 14:21:37","data":[{"key":"z","value":62.6243288556}]},{"category":"C","timestamp":"2023-12-13 09:43:52","data":[{"key":"y","value":96.4384908625}]}]} +{"rowindex":63,"customer_id":"def","day":"2023-12-14","flag":1,"event_sequence":[{"category":"A","timestamp":"2023-12-14 10:49:52","data":[{"key":"x","value":47.2768901655},{"key":"y","value":31.4990167429}]},{"category":"B","timestamp":"2023-12-14 13:00:17","data":[{"key":"y","value":47.1290340032},{"key":"x","value":63.4631919376}]},{"category":"A","timestamp":"2023-12-14 22:12:52","data":[]},{"category":"A","timestamp":"2023-12-14 06:31:57","data":[]},{"category":"A","timestamp":"2023-12-14 03:46:03","data":[]}]} +{"rowindex":64,"customer_id":"mno","day":"2023-12-9","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-09 10:04:27","data":[{"key":"y","value":67.6773976982},{"key":"w","value":30.3681543638}]},{"category":"D","timestamp":"2023-12-09 06:31:47","data":[]}]} +{"rowindex":65,"customer_id":"mno","day":"2023-12-4","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-04 03:30:32","data":[]},{"category":"B","timestamp":"2023-12-04 05:04:06","data":[{"key":"x","value":21.382181381}]}]} +{"rowindex":66,"customer_id":"mno","day":"2023-12-9","flag":0,"event_sequence":[]} +{"rowindex":67,"customer_id":"def","day":"2023-12-18","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-18 15:06:18","data":[{"key":"w","value":22.8608042274}]}]} +{"rowindex":68,"customer_id":"mno","day":"2023-12-2","flag":0,"event_sequence":[{"category":"A","timestamp":"2023-12-02 20:31:02","data":[{"key":"z","value":91.6471682783}]}]} +{"rowindex":69,"customer_id":"def","day":"2023-12-1","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-01 18:23:24","data":[]},{"category":"B","timestamp":"2023-12-01 03:38:19","data":[{"key":"z","value":77.6426948721}]},{"category":"D","timestamp":"2023-12-01 02:53:39","data":[]},{"category":"D","timestamp":"2023-12-01 01:16:05","data":[{"key":"x","value":4.1829224252}]}]} +{"rowindex":70,"customer_id":"ghi","day":"2023-12-2","flag":1,"event_sequence":[{"category":"A","timestamp":"2023-12-02 18:53:51","data":[]},{"category":"A","timestamp":"2023-12-02 11:05:50","data":[{"key":"z","value":41.8070964998}]},{"category":"B","timestamp":"2023-12-02 06:32:35","data":[]},{"category":"B","timestamp":"2023-12-02 07:03:09","data":[{"key":"x","value":73.1611243111}]}]} +{"rowindex":71,"customer_id":"ghi","day":"2023-12-9","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-09 04:54:59","data":[{"key":"x","value":85.2320581103}]},{"category":"B","timestamp":"2023-12-09 15:11:55","data":[]},{"category":"D","timestamp":"2023-12-09 16:21:45","data":[]},{"category":"B","timestamp":"2023-12-09 06:03:32","data":[{"key":"w","value":69.0663696235}]},{"category":"C","timestamp":"2023-12-09 02:48:41","data":[{"key":"y","value":13.3980977494}]}]} +{"rowindex":72,"customer_id":"abc","day":"2023-12-14","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-14 15:56:22","data":[]},{"category":"D","timestamp":"2023-12-14 06:48:33","data":[{"key":"y","value":36.2141968443},{"key":"z","value":95.4467019984}]}]} +{"rowindex":73,"customer_id":"mno","day":"2023-12-13","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-13 10:19:12","data":[]}]} +{"rowindex":74,"customer_id":"def","day":"2023-12-3","flag":0,"event_sequence":[]} +{"rowindex":75,"customer_id":"abc","day":"2023-12-13","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-13 19:07:09","data":[{"key":"w","value":18.8470628926},{"key":"z","value":88.20939594}]}]} +{"rowindex":76,"customer_id":"ghi","day":"2023-12-8","flag":0,"event_sequence":[{"category":"D","timestamp":"2023-12-08 15:22:08","data":[]},{"category":"C","timestamp":"2023-12-08 16:51:43","data":[{"key":"w","value":79.5244986146}]},{"category":"C","timestamp":"2023-12-08 03:12:25","data":[{"key":"w","value":56.6377952915},{"key":"z","value":42.3533060413}]}]} +{"rowindex":77,"customer_id":"jkl","day":"2023-12-13","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-13 12:14:14","data":[{"key":"w","value":35.2592201371},{"key":"y","value":13.5684896571}]}]} +{"rowindex":78,"customer_id":"abc","day":"2023-12-5","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-05 19:22:58","data":[{"key":"z","value":66.2843566224}]},{"category":"B","timestamp":"2023-12-05 19:39:08","data":[{"key":"w","value":34.080531438}]},{"category":"C","timestamp":"2023-12-05 02:53:05","data":[{"key":"z","value":33.991374759},{"key":"x","value":80.0208062703}]},{"category":"D","timestamp":"2023-12-05 13:30:43","data":[{"key":"y","value":67.1306733907}]},{"category":"A","timestamp":"2023-12-05 00:51:36","data":[{"key":"w","value":17.3844088301}]}]} +{"rowindex":79,"customer_id":"mno","day":"2023-12-9","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-09 10:36:18","data":[{"key":"y","value":17.9861379377},{"key":"x","value":31.1422706226}]},{"category":"A","timestamp":"2023-12-09 19:04:16","data":[]},{"category":"C","timestamp":"2023-12-09 23:46:25","data":[]},{"category":"B","timestamp":"2023-12-09 15:08:37","data":[]}]} +{"rowindex":80,"customer_id":"mno","day":"2023-12-11","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-11 23:50:20","data":[]},{"category":"A","timestamp":"2023-12-11 13:45:37","data":[{"key":"y","value":34.1896555846},{"key":"z","value":54.8455987136}]},{"category":"D","timestamp":"2023-12-11 05:27:06","data":[{"key":"z","value":8.6439113664},{"key":"w","value":57.8679152847}]},{"category":"A","timestamp":"2023-12-11 22:56:07","data":[]},{"category":"D","timestamp":"2023-12-11 01:09:13","data":[{"key":"x","value":94.8088772326},{"key":"y","value":92.9817038325}]}]} +{"rowindex":81,"customer_id":"mno","day":"2023-12-2","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-02 19:13:55","data":[{"key":"x","value":92.6140550812},{"key":"y","value":21.6844233156}]},{"category":"A","timestamp":"2023-12-02 10:19:54","data":[{"key":"z","value":96.1332346043},{"key":"y","value":12.3365763983}]},{"category":"C","timestamp":"2023-12-02 23:15:36","data":[]}]} +{"rowindex":82,"customer_id":"def","day":"2023-12-8","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-08 03:43:45","data":[{"key":"z","value":39.7558930693}]},{"category":"A","timestamp":"2023-12-08 01:35:47","data":[]},{"category":"D","timestamp":"2023-12-08 04:53:02","data":[{"key":"x","value":3.1323563783}]},{"category":"B","timestamp":"2023-12-08 01:12:21","data":[{"key":"w","value":21.6503102051},{"key":"y","value":43.4536696853}]},{"category":"B","timestamp":"2023-12-08 01:57:25","data":[{"key":"z","value":11.3705979892},{"key":"y","value":85.3671308445}]}]} +{"rowindex":83,"customer_id":"mno","day":"2023-12-16","flag":0,"event_sequence":[]} +{"rowindex":84,"customer_id":"def","day":"2023-12-13","flag":0,"event_sequence":[]} +{"rowindex":85,"customer_id":"jkl","day":"2023-12-6","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-06 18:06:14","data":[{"key":"w","value":75.6475285669},{"key":"y","value":92.2341481081}]},{"category":"B","timestamp":"2023-12-06 15:28:32","data":[]},{"category":"B","timestamp":"2023-12-06 19:45:52","data":[]},{"category":"C","timestamp":"2023-12-06 08:32:52","data":[]},{"category":"A","timestamp":"2023-12-06 17:32:37","data":[{"key":"y","value":80.2305875735}]}]} +{"rowindex":86,"customer_id":"abc","day":"2023-12-10","flag":2,"event_sequence":[{"category":"A","timestamp":"2023-12-10 09:34:20","data":[{"key":"y","value":10.7693525828},{"key":"w","value":81.4922282197}]},{"category":"C","timestamp":"2023-12-10 03:58:48","data":[{"key":"y","value":75.2926863125},{"key":"x","value":14.3834415502}]},{"category":"A","timestamp":"2023-12-10 09:09:24","data":[{"key":"z","value":26.1964055176},{"key":"w","value":33.2590307936}]},{"category":"C","timestamp":"2023-12-10 07:53:33","data":[{"key":"z","value":23.2141532358}]}]} +{"rowindex":87,"customer_id":"ghi","day":"2023-12-6","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-06 17:59:34","data":[]},{"category":"B","timestamp":"2023-12-06 15:30:32","data":[{"key":"z","value":65.5093670838}]},{"category":"C","timestamp":"2023-12-06 11:40:36","data":[{"key":"z","value":19.0969232242}]},{"category":"C","timestamp":"2023-12-06 23:24:48","data":[{"key":"w","value":41.7328593069}]}]} +{"rowindex":88,"customer_id":"mno","day":"2023-12-4","flag":2,"event_sequence":[{"category":"D","timestamp":"2023-12-04 09:37:49","data":[{"key":"z","value":73.115183578},{"key":"w","value":55.409641057}]},{"category":"A","timestamp":"2023-12-04 20:25:06","data":[{"key":"x","value":68.225517069}]},{"category":"C","timestamp":"2023-12-04 02:46:08","data":[]},{"category":"A","timestamp":"2023-12-04 06:18:04","data":[{"key":"x","value":95.7957065313},{"key":"y","value":68.2634789529}]}]} +{"rowindex":89,"customer_id":"ghi","day":"2023-12-3","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-03 07:02:27","data":[{"key":"w","value":17.0165951832}]},{"category":"D","timestamp":"2023-12-03 19:06:20","data":[]},{"category":"A","timestamp":"2023-12-03 17:50:14","data":[{"key":"x","value":4.3834633659},{"key":"z","value":84.6024255445}]},{"category":"C","timestamp":"2023-12-03 06:51:03","data":[]}]} +{"rowindex":90,"customer_id":"mno","day":"2023-12-12","flag":2,"event_sequence":[{"category":"B","timestamp":"2023-12-12 21:57:25","data":[{"key":"y","value":95.5058021347}]},{"category":"C","timestamp":"2023-12-12 07:24:27","data":[{"key":"z","value":17.9587475242}]},{"category":"A","timestamp":"2023-12-12 10:35:52","data":[{"key":"z","value":55.194876676}]},{"category":"D","timestamp":"2023-12-12 23:44:14","data":[{"key":"w","value":24.6177835891}]},{"category":"D","timestamp":"2023-12-12 16:09:40","data":[{"key":"y","value":32.2627525342},{"key":"x","value":77.4276051497}]}]} +{"rowindex":91,"customer_id":"abc","day":"2023-12-18","flag":0,"event_sequence":[]} +{"rowindex":92,"customer_id":"jkl","day":"2023-12-7","flag":0,"event_sequence":[{"category":"C","timestamp":"2023-12-07 11:36:31","data":[{"key":"w","value":70.4689420724}]},{"category":"A","timestamp":"2023-12-07 09:18:26","data":[{"key":"z","value":31.0551928628},{"key":"y","value":4.7472634353}]},{"category":"D","timestamp":"2023-12-07 05:44:09","data":[{"key":"z","value":37.7906214595},{"key":"w","value":38.618192046}]},{"category":"B","timestamp":"2023-12-07 16:30:31","data":[{"key":"y","value":92.4389663402}]},{"category":"A","timestamp":"2023-12-07 16:35:58","data":[{"key":"x","value":63.8398372162},{"key":"z","value":90.1325261576}]}]} +{"rowindex":93,"customer_id":"abc","day":"2023-12-15","flag":0,"event_sequence":[]} +{"rowindex":94,"customer_id":"mno","day":"2023-12-7","flag":0,"event_sequence":[]} +{"rowindex":95,"customer_id":"ghi","day":"2023-12-14","flag":2,"event_sequence":[{"category":"C","timestamp":"2023-12-14 22:37:13","data":[{"key":"x","value":55.3895966386}]},{"category":"B","timestamp":"2023-12-14 15:56:30","data":[{"key":"y","value":87.7140820119},{"key":"x","value":48.3079555774}]},{"category":"D","timestamp":"2023-12-14 06:35:41","data":[{"key":"y","value":60.4608873685},{"key":"x","value":74.6169412477}]}]} +{"rowindex":96,"customer_id":"def","day":"2023-12-1","flag":1,"event_sequence":[{"category":"D","timestamp":"2023-12-01 07:57:31","data":[{"key":"w","value":83.8985453363},{"key":"x","value":37.6937609678}]},{"category":"A","timestamp":"2023-12-01 00:44:54","data":[{"key":"w","value":65.3980461559}]},{"category":"D","timestamp":"2023-12-01 17:43:00","data":[]},{"category":"A","timestamp":"2023-12-01 02:48:33","data":[{"key":"z","value":23.8579933054}]},{"category":"B","timestamp":"2023-12-01 07:36:21","data":[{"key":"y","value":53.0811307247}]}]} +{"rowindex":97,"customer_id":"ghi","day":"2023-12-5","flag":0,"event_sequence":[{"category":"B","timestamp":"2023-12-05 09:23:03","data":[]},{"category":"C","timestamp":"2023-12-05 01:22:08","data":[]}]} +{"rowindex":98,"customer_id":"ghi","day":"2023-12-14","flag":2,"event_sequence":[]} +{"rowindex":99,"customer_id":"ghi","day":"2023-12-14","flag":1,"event_sequence":[{"category":"B","timestamp":"2023-12-14 15:46:06","data":[{"key":"z","value":48.2733214833}]},{"category":"D","timestamp":"2023-12-14 15:39:56","data":[]},{"category":"D","timestamp":"2023-12-14 17:18:14","data":[]},{"category":"D","timestamp":"2023-12-14 02:41:54","data":[{"key":"z","value":98.7008514491},{"key":"x","value":55.3757151027}]},{"category":"C","timestamp":"2023-12-14 07:54:49","data":[{"key":"z","value":69.8181005179}]}]} diff --git a/tests/data/nested_schema.json b/tests/data/nested_schema.json new file mode 100644 index 0000000000..c3fa39b36d --- /dev/null +++ b/tests/data/nested_schema.json @@ -0,0 +1,56 @@ +[ + { + "mode": "REQUIRED", + "name": "rowindex", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "customer_id", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "day", + "type": "DATE" + }, + { + "mode": "NULLABLE", + "name": "flag", + "type": "INTEGER" + }, + { + "fields": [ + { + "fields": [ + { + "mode": "NULLABLE", + "name": "value", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "key", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "data", + "type": "RECORD" + }, + { + "mode": "NULLABLE", + "name": "timestamp", + "type": "TIMESTAMP" + }, + { + "mode": "NULLABLE", + "name": "category", + "type": "STRING" + } + ], + "mode": "REPEATED", + "name": "event_sequence", + "type": "RECORD" + } +] diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 0ad4280497..4aa27d6a19 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -231,6 +231,7 @@ def load_test_data_tables( for table_name, schema_filename, data_filename in [ ("scalars", "scalars_schema.json", "scalars.jsonl"), ("scalars_too", "scalars_schema.json", "scalars.jsonl"), + ("nested", "nested_schema.json", "nested.jsonl"), ("penguins", "penguins_schema.json", "penguins.jsonl"), ("time_series", "time_series_schema.json", "time_series.jsonl"), ("hockey_players", "hockey_players.json", "hockey_players.jsonl"), @@ -295,6 +296,11 @@ def scalars_table_tokyo(test_data_tables_tokyo) -> str: return test_data_tables_tokyo["scalars"] +@pytest.fixture(scope="session") +def nested_table_id(test_data_tables) -> str: + return test_data_tables["nested"] + + @pytest.fixture(scope="session") def penguins_table_id(test_data_tables) -> str: return test_data_tables["penguins"] @@ -315,6 +321,28 @@ def matrix_3by4_table_id(test_data_tables) -> str: return test_data_tables["matrix_3by4"] +@pytest.fixture(scope="session") +def nested_df( + nested_table_id: str, session: bigframes.Session +) -> bigframes.dataframe.DataFrame: + """DataFrame pointing at test data.""" + return session.read_gbq(nested_table_id, index_col="rowindex") + + +@pytest.fixture(scope="session") +def nested_pandas_df() -> pd.DataFrame: + """pd.DataFrame pointing at test data.""" + + df = pd.read_json( + DATA_DIR / "nested.jsonl", + lines=True, + ) + convert_pandas_dtypes(df, bytes_col=True) + + df = df.set_index("rowindex") + return df + + @pytest.fixture(scope="session") def scalars_df_default_index( scalars_df_index: bigframes.dataframe.DataFrame, diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 4ae31fa4a0..3d31253021 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -1251,9 +1251,7 @@ def test_get_dtypes(scalars_df_default_index): ) -def test_get_dtypes_array_struct(session): - """We may upgrade struct and array to proper arrow dtype support in future. For now, - we return python objects""" +def test_get_dtypes_array_struct_query(session): df = session.read_gbq( """SELECT [1, 3, 2] AS array_column, @@ -1281,6 +1279,41 @@ def test_get_dtypes_array_struct(session): ) +def test_get_dtypes_array_struct_table(nested_df): + dtypes = nested_df.dtypes + pd.testing.assert_series_equal( + dtypes, + pd.Series( + { + "customer_id": pd.StringDtype(storage="pyarrow"), + "day": pd.ArrowDtype(pa.date32()), + "flag": pd.Int64Dtype(), + "event_sequence": pd.ArrowDtype( + pa.list_( + pa.struct( + [ + ( + "data", + pa.list_( + pa.struct( + [ + ("value", pa.float64()), + ("key", pa.string()), + ], + ), + ), + ), + ("timestamp", pa.timestamp("us", "UTC")), + ("category", pa.string()), + ] + ), + ), + ), + } + ), + ) + + def test_shape(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs bf_result = scalars_df.shape diff --git a/tests/unit/core/test_blocks.py b/tests/unit/core/test_blocks.py index 86715d090c..5a4f0951d3 100644 --- a/tests/unit/core/test_blocks.py +++ b/tests/unit/core/test_blocks.py @@ -75,8 +75,8 @@ def test_block_from_local(data): expected = pandas.DataFrame(data) - block = blocks.block_from_local(data) + block = blocks.Block.from_local(data) pandas.testing.assert_index_equal(block.column_labels, expected.columns) - assert tuple(block.index_labels) == tuple(expected.index.names) + assert tuple(block.index.names) == tuple(expected.index.names) assert block.shape == expected.shape diff --git a/tests/unit/resources.py b/tests/unit/resources.py index b239b04671..967e42548f 100644 --- a/tests/unit/resources.py +++ b/tests/unit/resources.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime from typing import Dict, List, Optional import unittest.mock as mock @@ -49,14 +50,21 @@ def create_bigquery_session( "test_dataset", ) - query_job = mock.create_autospec(google.cloud.bigquery.QueryJob) - type(query_job).destination = mock.PropertyMock( - return_value=anonymous_dataset.table("test_table"), - ) - type(query_job).session_info = google.cloud.bigquery.SessionInfo( - {"sessionInfo": {"sessionId": session_id}}, - ) - bqclient.query.return_value = query_job + def query_mock(query, *args, **kwargs): + query_job = mock.create_autospec(google.cloud.bigquery.QueryJob) + type(query_job).destination = mock.PropertyMock( + return_value=anonymous_dataset.table("test_table"), + ) + type(query_job).session_info = google.cloud.bigquery.SessionInfo( + {"sessionInfo": {"sessionId": session_id}}, + ) + + if query.startswith("SELECT CURRENT_TIMESTAMP()"): + query_job.result = mock.MagicMock(return_value=[[datetime.datetime.now()]]) + + return query_job + + bqclient.query = query_mock clients_provider = mock.create_autospec(bigframes.session.clients.ClientsProvider) type(clients_provider).bqclient = mock.PropertyMock(return_value=bqclient) diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 5fc8996993..ea8d0882ae 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -63,15 +63,31 @@ def test_read_gbq_not_found_tables(not_found_table_id): ], ) def test_read_gbq_external_table_no_drive_access(api_name, query_or_table): - bqclient = mock.create_autospec(google.cloud.bigquery.Client, instance=True) - bqclient.project = "test-project" - bqclient.get_table.side_effect = google.api_core.exceptions.Forbidden( - "Access Denied: BigQuery BigQuery: Permission denied while getting Drive credentials." - ) - session = resources.create_bigquery_session(bqclient=bqclient) + session = resources.create_bigquery_session() + session_query_mock = session.bqclient.query + + def query_mock(query, *args, **kwargs): + if query.lstrip().startswith("SELECT *"): + raise google.api_core.exceptions.Forbidden( + "Access Denied: BigQuery BigQuery: Permission denied while getting Drive credentials." + ) + + return session_query_mock(query, *args, **kwargs) + + session.bqclient.query = query_mock + + def get_table_mock(dataset_ref): + dataset = google.cloud.bigquery.Dataset(dataset_ref) + dataset.location = session._location + return dataset + + session.bqclient.get_table = get_table_mock api = getattr(session, api_name) - with pytest.raises(google.api_core.exceptions.Forbidden): + with pytest.raises( + google.api_core.exceptions.Forbidden, + match="Check https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions.", + ): api(query_or_table) diff --git a/tests/unit/test_compute_options.py b/tests/unit/test_compute_options.py index a613bca7b9..2de715a40e 100644 --- a/tests/unit/test_compute_options.py +++ b/tests/unit/test_compute_options.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from unittest import mock + import bigframes as bf from . import resources @@ -18,8 +20,8 @@ def test_maximum_bytes_option(): session = resources.create_bigquery_session() + session.bqclient.query = mock.MagicMock() with bf.option_context("compute.maximum_bytes_billed", 10000): - session.bqclient.query.reset_mock() session._start_query("query") call = session.bqclient.query.call_args assert call.kwargs["job_config"].maximum_bytes_billed == 10000 diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 9f415f3bc4..a7f45efc85 100644 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -16,6 +16,7 @@ import pandas import bigframes.core as core +import bigframes.core.expression as ex import bigframes.core.ordering import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -196,7 +197,9 @@ def test_arrayvalue_to_ibis_expr_with_aggregate(): total_ordering_columns=["col1"], ) expr = value.aggregate( - aggregations=(("col1", agg_ops.sum_op, "col4"),), + aggregations=( + (ex.UnaryAggregation(agg_ops.sum_op, ex.free_var("col1")), "col4"), + ), by_column_ids=["col1"], dropna=False, )._compile_ordered() diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 93fba9f3aa..05f4167838 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -3296,19 +3296,19 @@ def apply(self, func, *, args=(), **kwargs): >>> df = bpd.DataFrame({'col1': [1, 2], 'col2': [3, 4]}) >>> df - col1 col2 - 0 1 3 - 1 2 4 + col1 col2 + 0 1 3 + 1 2 4 [2 rows x 2 columns] - >>> def sqaure(x): + >>> def square(x): ... return x * x - >>> df1 = df.apply(sqaure) - >>> df + + >>> df.apply(square) col1 col2 - 0 1 3 - 1 2 4 + 0 1 9 + 1 4 16 [2 rows x 2 columns]