diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index ccfd682215..6c78a07f3b 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -211,8 +211,8 @@ def column_ids(self) -> typing.Sequence[str]: return tuple(self._column_names.keys()) @property - def hidden_ordering_columns(self) -> typing.Tuple[ibis_types.Value, ...]: - return self._hidden_ordering_columns + def _hidden_column_ids(self) -> typing.Sequence[str]: + return tuple(self._hidden_ordering_column_names.keys()) @property def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]: @@ -400,7 +400,7 @@ def _hide_column(self, column_id) -> ArrayValue: expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name}) return expr_builder.build() - def promote_offsets(self) -> typing.Tuple[ArrayValue, str]: + def promote_offsets(self, col_id: str) -> ArrayValue: """ Convenience function to promote copy of column offsets to a value column. Can be used to reset index. """ @@ -408,16 +408,15 @@ def promote_offsets(self) -> typing.Tuple[ArrayValue, str]: ordering = self._ordering if (not ordering.is_sequential) or (not ordering.total_order_col): - return self._project_offsets().promote_offsets() - col_id = bigframes.core.guid.generate_guid() + return self._project_offsets().promote_offsets(col_id) expr_builder = self.builder() expr_builder.columns = [ self._get_any_column(ordering.total_order_col.column_id).name(col_id), *self.columns, ] - return expr_builder.build(), col_id + return expr_builder.build() - def select_columns(self, column_ids: typing.Sequence[str]): + def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: return self._projection( [self._get_ibis_column(col_id) for col_id in column_ids] ) @@ -807,7 +806,7 @@ def _create_order_columns( elif ordering_mode == "string_encoded": return (self._create_string_ordering_column().name(order_col_name),) elif expose_hidden_cols: - return self.hidden_ordering_columns + return self._hidden_ordering_columns return () def _create_offset_column(self) -> ibis_types.IntegerColumn: diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 904da7f312..b0f05f4798 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -40,8 +40,8 @@ def equals(block1: blocks.Block, block2: blocks.Block) -> bool: equality_ids = [] for lcol, rcol in zip(block1.value_columns, block2.value_columns): - lcolmapped = lmap(lcol) - rcolmapped = rmap(rcol) + lcolmapped = lmap[lcol] + rcolmapped = rmap[rcol] joined_block, result_id = joined_block.apply_binary_op( lcolmapped, rcolmapped, ops.eq_nulls_match_op ) @@ -563,8 +563,8 @@ def align_rows( joined_index, (get_column_left, get_column_right) = left_block.index.join( right_block.index, how=join ) - left_columns = [get_column_left(col) for col in left_block.value_columns] - right_columns = [get_column_right(col) for col in right_block.value_columns] + left_columns = [get_column_left[col] for col in left_block.value_columns] + right_columns = [get_column_right[col] for col in right_block.value_columns] left_block = joined_index._block.select_columns(left_columns) right_block = joined_index._block.select_columns(right_columns) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 9b49645c71..4548fca593 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -39,6 +39,7 @@ import bigframes.core.guid as guid import bigframes.core.indexes as indexes import bigframes.core.joins as joins +import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as ordering import bigframes.core.utils import bigframes.core.utils as utils @@ -97,7 +98,8 @@ def __init__( "'index_columns' and 'index_labels' must have equal length" ) if len(index_columns) == 0: - expr, new_index_col_id = expr.promote_offsets() + new_index_col_id = guid.generate_guid() + expr = expr.promote_offsets(new_index_col_id) index_columns = [new_index_col_id] self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple @@ -260,7 +262,8 @@ def reset_index(self, drop: bool = True) -> Block: from Index classes that point to this block. """ block = self - expr, new_index_col_id = self._expr.promote_offsets() + new_index_col_id = guid.generate_guid() + expr = self._expr.promote_offsets(new_index_col_id) if drop: # Even though the index might be part of the ordering, keep that # ordering expression as reset_index shouldn't change the row @@ -833,7 +836,8 @@ def aggregate_all_and_stack( else: # axis_n == 1 # using offsets as identity to group on. # TODO: Allow to promote identity/total_order columns instead for better perf - expr_with_offsets, offset_col = self.expr.promote_offsets() + offset_col = guid.generate_guid() + expr_with_offsets = self.expr.promote_offsets(offset_col) stacked_expr = expr_with_offsets.unpivot( row_labels=self.column_labels.to_list(), index_col_ids=[guid.generate_guid()], @@ -952,9 +956,10 @@ def aggregate( ] by_column_labels = self._get_labels_for_columns(by_value_columns) labels = (*by_column_labels, *aggregate_labels) - result_expr_pruned, offsets_id = result_expr.select_columns( + offsets_id = guid.generate_guid() + result_expr_pruned = result_expr.select_columns( [*by_value_columns, *output_col_ids] - ).promote_offsets() + ).promote_offsets(offsets_id) return ( Block( @@ -975,7 +980,8 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp): aggregations = [(column_id, stat, stat.name) for stat in stats_to_fetch] expr = self.expr.aggregate(aggregations) - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = expr.promote_offsets(offset_index_id) block = Block( expr, index_columns=[offset_index_id], @@ -999,7 +1005,8 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str): ) ] expr = self.expr.corr_aggregate(corr_aggregations) - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = expr.promote_offsets(offset_index_id) block = Block( expr, index_columns=[offset_index_id], @@ -1197,7 +1204,8 @@ def retrieve_repr_request_results( return formatted_df, count, query_job def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]: - expr, result_id = self._expr.promote_offsets() + result_id = guid.generate_guid() + expr = self._expr.promote_offsets(result_id) return ( Block( expr, @@ -1471,67 +1479,76 @@ def merge( "outer", "right", ], - left_col_ids: typing.Sequence[str], - right_col_ids: typing.Sequence[str], + left_join_ids: typing.Sequence[str], + right_join_ids: typing.Sequence[str], sort: bool, suffixes: tuple[str, str] = ("_x", "_y"), ) -> Block: - ( - joined_expr, - coalesced_join_cols, - (get_column_left, get_column_right), - ) = joins.join_by_column( + joined_expr = joins.join_by_column( self.expr, - left_col_ids, + left_join_ids, other.expr, - right_col_ids, + right_join_ids, how=how, - sort=sort, ) + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + self.expr.column_ids, other.expr.column_ids + ) + result_columns = [] + matching_join_labels = [] + + coalesced_ids = [] + for left_id, right_id in zip(left_join_ids, right_join_ids): + coalesced_id = guid.generate_guid() + joined_expr = joined_expr.project_binary_op( + get_column_left[left_id], + get_column_right[right_id], + ops.coalesce_op, + coalesced_id, + ) + coalesced_ids.append(coalesced_id) + + for col_id in self.value_columns: + if col_id in left_join_ids: + key_part = left_join_ids.index(col_id) + matching_right_id = right_join_ids[key_part] + if ( + self.col_id_to_label[col_id] + == other.col_id_to_label[matching_right_id] + ): + matching_join_labels.append(self.col_id_to_label[col_id]) + result_columns.append(coalesced_ids[key_part]) + else: + result_columns.append(get_column_left[col_id]) + else: + result_columns.append(get_column_left[col_id]) + for col_id in other.value_columns: + if col_id in right_join_ids: + key_part = right_join_ids.index(col_id) + if other.col_id_to_label[matching_right_id] in matching_join_labels: + pass + else: + result_columns.append(get_column_right[col_id]) + else: + result_columns.append(get_column_right[col_id]) - # which join key parts should be coalesced - merge_join_key_mask = [ - str(self.col_id_to_label[left_id]) == str(other.col_id_to_label[right_id]) - for left_id, right_id in zip(left_col_ids, right_col_ids) - ] - labels_to_coalesce = [ - self.col_id_to_label[col_id] - for i, col_id in enumerate(left_col_ids) - if merge_join_key_mask[i] - ] - - def left_col_mapping(col_id: str) -> str: - if col_id in left_col_ids: - join_key_part = left_col_ids.index(col_id) - if merge_join_key_mask[join_key_part]: - return coalesced_join_cols[join_key_part] - return get_column_left(col_id) - - def right_col_mapping(col_id: str) -> typing.Optional[str]: - if col_id in right_col_ids: - join_key_part = right_col_ids.index(col_id) - if merge_join_key_mask[join_key_part]: - return None - return get_column_right(col_id) - - left_columns = [left_col_mapping(col_id) for col_id in self.value_columns] - - right_columns = [ - typing.cast(str, right_col_mapping(col_id)) - for col_id in other.value_columns - if right_col_mapping(col_id) - ] + if sort: + # sort uses coalesced join keys always + joined_expr = joined_expr.order_by( + [ordering.OrderingColumnReference(col_id) for col_id in coalesced_ids], + stable=True, + ) - expr = joined_expr.select_columns([*left_columns, *right_columns]) + joined_expr = joined_expr.select_columns(result_columns) labels = utils.merge_column_labels( self.column_labels, other.column_labels, - coalesce_labels=labels_to_coalesce, + coalesce_labels=matching_join_labels, suffixes=suffixes, ) - # Constructs default index - expr, offset_index_id = expr.promote_offsets() + offset_index_id = guid.generate_guid() + expr = joined_expr.promote_offsets(offset_index_id) return Block(expr, index_columns=[offset_index_id], column_labels=labels) def _force_reproject(self) -> Block: diff --git a/bigframes/core/indexes/index.py b/bigframes/core/indexes/index.py index 7d15e67649..677bb8529c 100644 --- a/bigframes/core/indexes/index.py +++ b/bigframes/core/indexes/index.py @@ -17,7 +17,7 @@ from __future__ import annotations import typing -from typing import Callable, Sequence, Tuple, Union +from typing import Mapping, Sequence, Tuple, Union import numpy as np import pandas @@ -27,6 +27,7 @@ import bigframes.core.block_transforms as block_ops import bigframes.core.blocks as blocks import bigframes.core.joins as joins +import bigframes.core.joins.name_resolution as join_names import bigframes.core.ordering as order import bigframes.core.utils as utils import bigframes.dtypes @@ -413,7 +414,7 @@ def join( how="left", sort=False, block_identity_join: bool = False, - ) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: + ) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: if not isinstance(other, IndexValue): # TODO(swast): We need to improve this error message to be more # actionable for the user. For example, it's possible they @@ -456,27 +457,34 @@ def join_mono_indexed( how="left", sort=False, block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: - ( - combined_expr, - joined_index_col_names, - (get_column_left, get_column_right), - ) = joins.join_by_column( +) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: + left_expr = left._block.expr + right_expr = right._block.expr + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + left_expr.column_ids, right_expr.column_ids + ) + combined_expr = joins.join_by_column( left._block.expr, left._block.index_columns, right._block.expr, right._block.index_columns, how=how, - sort=sort, allow_row_identity_join=(not block_identity_join), ) # Drop original indices from each side. and used the coalesced combination generated by the join. - left_indices = [get_column_left(col_id) for col_id in left._block.index_columns] - right_indices = [get_column_right(col_id) for col_id in right._block.index_columns] - combined_expr = combined_expr.drop_columns(left_indices).drop_columns(right_indices) + left_index = get_column_left[left._block.index_columns[0]] + right_index = get_column_right[right._block.index_columns[0]] + # Drop original indices from each side. and used the coalesced combination generated by the join. + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, [left_index], [right_index], how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) block = blocks.Block( combined_expr, - index_columns=[*joined_index_col_names], + index_columns=coalesced_join_cols, column_labels=[*left._block.column_labels, *right._block.column_labels], index_labels=[left.name] if left.name == right.name else [None], ) @@ -493,7 +501,7 @@ def join_multi_indexed( how="left", sort=False, block_identity_join: bool = False, -) -> Tuple[IndexValue, Tuple[Callable[[str], str], Callable[[str], str]],]: +) -> Tuple[IndexValue, Tuple[Mapping[str, str], Mapping[str, str]],]: if not (left.is_uniquely_named() and right.is_uniquely_named()): raise ValueError("Joins not supported on indices with non-unique level names") @@ -508,25 +516,33 @@ def join_multi_indexed( right_join_ids = [right.resolve_level_name(name) for name in common_names] names_fully_match = len(left_only_names) == 0 and len(right_only_names) == 0 - ( - combined_expr, - joined_index_col_names, - (get_column_left, get_column_right), - ) = joins.join_by_column( - left._block.expr, + + left_expr = left._block.expr + right_expr = right._block.expr + get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER( + left_expr.column_ids, right_expr.column_ids + ) + + combined_expr = joins.join_by_column( + left_expr, left_join_ids, - right._block.expr, + right_expr, right_join_ids, how=how, - sort=sort, # If we're only joining on a subset of the index columns, we need to # perform a true join. - allow_row_identity_join=names_fully_match and not block_identity_join, + allow_row_identity_join=(names_fully_match and not block_identity_join), ) + left_ids_post_join = [get_column_left[id] for id in left_join_ids] + right_ids_post_join = [get_column_right[id] for id in right_join_ids] # Drop original indices from each side. and used the coalesced combination generated by the join. - combined_expr = combined_expr.drop_columns( - [get_column_left(col) for col in left_join_ids] - ).drop_columns([get_column_right(col) for col in right_join_ids]) + combined_expr, coalesced_join_cols = coalesce_columns( + combined_expr, left_ids_post_join, right_ids_post_join, how=how + ) + if sort: + combined_expr = combined_expr.order_by( + [order.OrderingColumnReference(col_id) for col_id in coalesced_join_cols] + ) if left.nlevels == 1: index_labels = right.names @@ -536,12 +552,13 @@ def join_multi_indexed( index_labels = [*common_names, *left_only_names, *right_only_names] def resolve_label_id(label: blocks.Label) -> str: + # if name is shared between both blocks, coalesce the values if label in common_names: - return joined_index_col_names[common_names.index(label)] + return coalesced_join_cols[common_names.index(label)] if label in left_only_names: - return get_column_left(left.resolve_level_name(label)) + return get_column_left[left.resolve_level_name(label)] if label in right_only_names: - return get_column_right(right.resolve_level_name(label)) + return get_column_right[right.resolve_level_name(label)] raise ValueError(f"Unexpected label: {label}") index_columns = [resolve_label_id(label) for label in index_labels] @@ -556,3 +573,29 @@ def resolve_label_id(label: blocks.Label) -> str: typing.cast(IndexValue, block.index), (get_column_left, get_column_right), ) + + +def coalesce_columns( + expr: core.ArrayValue, + left_ids: typing.Sequence[str], + right_ids: typing.Sequence[str], + how: str, +) -> Tuple[core.ArrayValue, Sequence[str]]: + result_ids = [] + for left_id, right_id in zip(left_ids, right_ids): + if how == "left" or how == "inner": + result_ids.append(left_id) + expr = expr.drop_columns([right_id]) + elif how == "right": + result_ids.append(right_id) + expr = expr.drop_columns([left_id]) + elif how == "outer": + coalesced_id = bigframes.core.guid.generate_guid() + expr = expr.project_binary_op( + left_id, right_id, ops.coalesce_op, coalesced_id + ) + expr = expr.drop_columns([left_id, right_id]) + result_ids.append(coalesced_id) + else: + raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") + return expr, result_ids diff --git a/bigframes/core/joins/name_resolution.py b/bigframes/core/joins/name_resolution.py new file mode 100644 index 0000000000..df946b3a59 --- /dev/null +++ b/bigframes/core/joins/name_resolution.py @@ -0,0 +1,46 @@ +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +from typing import Mapping, Sequence, Tuple + + +class JoinNameRemapper: + def __init__(self, namespace: str) -> None: + self._namespace = namespace + + def __call__( + self, left_column_ids: Sequence[str], right_column_ids: Sequence[str] + ) -> Tuple[Mapping[str, str], Mapping[str, str]]: + """ + When joining column ids from different namespaces, this function defines how names are remapped. + + Take care to map value column ids and hidden column ids in separate namespaces. This is important because value + column ids must be deterministic as they are referenced by dependent operators. The generation of hidden ids is + dependent on compilation context, and should be completely separated from value column id mappings. + """ + # This naming strategy depends on the number of value columns in source tables. + # This means column id mappings must be adjusted if pushing operations above or below join in transformation + new_left_ids = { + col: f"{self._namespace}_l_{i}" for i, col in enumerate(left_column_ids) + } + new_right_ids = { + col: f"{self._namespace}_r_{i}" for i, col in enumerate(right_column_ids) + } + return new_left_ids, new_right_ids + + +# Defines how column ids are remapped, regardless of join strategy or ordering mode +# Use this remapper for all value column remappings. +JOIN_NAME_REMAPPER = JoinNameRemapper("bfjoin") diff --git a/bigframes/core/joins/row_identity.py b/bigframes/core/joins/row_identity.py index 156e7aef40..76e456ec94 100644 --- a/bigframes/core/joins/row_identity.py +++ b/bigframes/core/joins/row_identity.py @@ -18,20 +18,20 @@ import functools import typing -from typing import Callable, Tuple import ibis import ibis.expr.types as ibis_types import bigframes.constants as constants import bigframes.core as core +import bigframes.core.joins.name_resolution as naming SUPPORTED_ROW_IDENTITY_HOW = {"outer", "left", "inner"} def join_by_row_identity( left: core.ArrayValue, right: core.ArrayValue, *, how: str -) -> Tuple[core.ArrayValue, Tuple[Callable[[str], str], Callable[[str], str]],]: +) -> core.ArrayValue: """Compute join when we are joining by row identity not a specific column.""" if how not in SUPPORTED_ROW_IDENTITY_HOW: raise NotImplementedError( @@ -62,31 +62,42 @@ def join_by_row_identity( left_mask = left_relative_predicates if how in ["right", "outer"] else None right_mask = right_relative_predicates if how in ["left", "outer"] else None + + # Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result + lpublicmapping, rpublicmapping = naming.JOIN_NAME_REMAPPER( + left.column_ids, right.column_ids + ) + lhiddenmapping, rhiddenmapping = naming.JoinNameRemapper(namespace="hidden")( + left._hidden_column_ids, right._hidden_column_ids + ) + map_left_id = {**lpublicmapping, **lhiddenmapping} + map_right_id = {**rpublicmapping, **rhiddenmapping} + joined_columns = [ - _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id(key)) + _mask_value(left._get_ibis_column(key), left_mask).name(map_left_id[key]) for key in left.column_ids ] + [ - _mask_value(right._get_ibis_column(key), right_mask).name(map_right_id(key)) + _mask_value(right._get_ibis_column(key), right_mask).name(map_right_id[key]) for key in right.column_ids ] # If left isn't being masked, can just use left ordering if not left_mask: col_mapping = { - order_ref.column_id: map_left_id(order_ref.column_id) + order_ref.column_id: map_left_id[order_ref.column_id] for order_ref in left._ordering.ordering_value_columns } new_ordering = left._ordering.with_column_remap(col_mapping) else: ordering_columns = [ - col_ref.with_name(map_left_id(col_ref.column_id)) + col_ref.with_name(map_left_id[col_ref.column_id]) for col_ref in left._ordering.ordering_value_columns ] + [ - col_ref.with_name(map_right_id(col_ref.column_id)) + col_ref.with_name(map_right_id[col_ref.column_id]) for col_ref in right._ordering.ordering_value_columns ] left_total_order_cols = frozenset( - map_left_id(col) for col in left._ordering.total_ordering_columns + map_left_id[col] for col in left._ordering.total_ordering_columns ) # Assume that left ordering is sufficient since 1:1 join over same base table join_total_order_cols = left_total_order_cols @@ -95,12 +106,12 @@ def join_by_row_identity( ) hidden_ordering_columns = [ - left._get_hidden_ordering_column(key.column_id).name(map_left_id(key.column_id)) + left._get_hidden_ordering_column(key.column_id).name(map_left_id[key.column_id]) for key in left._ordering.ordering_value_columns if key.column_id in left._hidden_ordering_column_names.keys() ] + [ right._get_hidden_ordering_column(key.column_id).name( - map_right_id(key.column_id) + map_right_id[key.column_id] ) for key in right._ordering.ordering_value_columns if key.column_id in right._hidden_ordering_column_names.keys() @@ -114,18 +125,7 @@ def join_by_row_identity( ordering=new_ordering, predicates=combined_predicates, ) - return joined_expr, ( - lambda key: map_left_id(key), - lambda key: map_right_id(key), - ) - - -def map_left_id(left_side_id): - return f"{left_side_id}_x" - - -def map_right_id(right_side_id): - return f"{right_side_id}_y" + return joined_expr def _mask_value( diff --git a/bigframes/core/joins/single_column.py b/bigframes/core/joins/single_column.py index f194b8f8c4..0c0e2008b5 100644 --- a/bigframes/core/joins/single_column.py +++ b/bigframes/core/joins/single_column.py @@ -16,17 +16,15 @@ from __future__ import annotations -import itertools import typing -from typing import Callable, Literal, Tuple +from typing import Literal, Mapping import ibis import ibis.expr.datatypes as ibis_dtypes import ibis.expr.types as ibis_types -import bigframes.constants as constants import bigframes.core as core -import bigframes.core.guid as guid +import bigframes.core.joins.name_resolution as naming import bigframes.core.joins.row_identity import bigframes.core.ordering @@ -43,13 +41,8 @@ def join_by_column( "outer", "right", ], - sort: bool = False, allow_row_identity_join: bool = True, -) -> Tuple[ - core.ArrayValue, - typing.Sequence[str], - Tuple[Callable[[str], str], Callable[[str], str]], -]: +) -> core.ArrayValue: """Join two expressions by column equality. Arguments: @@ -62,14 +55,9 @@ def join_by_column( If True, allow matching by row identity. Set to False to always perform a true JOIN in generated SQL. Returns: - The joined expression and the objects needed to interpret it. - - * ArrayValue: Joined table with all columns from left and right. - * Sequence[str]: Column IDs of the coalesced join columns. Sometimes either the - left/right table will have missing rows. This column pulls the - non-NULL value from either left/right. - * Tuple[Callable, Callable]: For a given column ID from left or right, - respectively, return the new column id from the combined expression. + The joined expression. The resulting columns will be, in order, + first the coalesced join keys, then, all the left columns, and + finally, all the right columns. """ if ( allow_row_identity_join @@ -85,71 +73,33 @@ def join_by_column( for lcol, rcol in zip(left_column_ids, right_column_ids) ) ): - combined_expr, ( - get_column_left, - get_column_right, - ) = bigframes.core.joins.row_identity.join_by_row_identity(left, right, how=how) - left_join_keys = [ - combined_expr._get_ibis_column(get_column_left(col)) - for col in left_column_ids - ] - right_join_keys = [ - combined_expr._get_ibis_column(get_column_right(col)) - for col in right_column_ids - ] - join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how) - join_key_ids = [col.get_name() for col in join_key_cols] - combined_expr = combined_expr._projection( - [*join_key_cols, *combined_expr.columns] - ) - if sort: - combined_expr = combined_expr.order_by( - [ - core.OrderingColumnReference(join_col_id) - for join_col_id in join_key_ids - ] - ) - return ( - combined_expr, - join_key_ids, - ( - get_column_left, - get_column_right, - ), + return bigframes.core.joins.row_identity.join_by_row_identity( + left, right, how=how ) else: - lmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - left.column_ids, left._hidden_ordering_column_names - ) - } - rmapping = { - col_id: guid.generate_guid() - for col_id in itertools.chain( - right.column_ids, right._hidden_ordering_column_names - ) - } - - def get_column_left(col_id): - return lmapping[col_id] - - def get_column_right(col_id): - return rmapping[col_id] + # Value column mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result + l_public_mapping, r_public_mapping = naming.JOIN_NAME_REMAPPER( + left.column_ids, right.column_ids + ) + l_hidden_mapping, r_hidden_mapping = naming.JoinNameRemapper( + namespace="hidden" + )(left._hidden_column_ids, right._hidden_column_ids) + l_mapping = {**l_public_mapping, **l_hidden_mapping} + r_mapping = {**r_public_mapping, **r_hidden_mapping} left_table = left._to_ibis_expr( "unordered", expose_hidden_cols=True, - col_id_overrides=lmapping, + col_id_overrides=l_mapping, ) right_table = right._to_ibis_expr( "unordered", expose_hidden_cols=True, - col_id_overrides=rmapping, + col_id_overrides=r_mapping, ) join_conditions = [ - value_to_join_key(left_table[lmapping[left_index]]) - == value_to_join_key(right_table[rmapping[right_index]]) + value_to_join_key(left_table[l_mapping[left_index]]) + == value_to_join_key(right_table[r_mapping[right_index]]) for left_index, right_index in zip(left_column_ids, right_column_ids) ] @@ -158,97 +108,39 @@ def get_column_right(col_id): right_table, predicates=join_conditions, how=how, - lname="{name}_x", - rname="{name}_y", ) # Preserve ordering accross joins. ordering = join_orderings( left._ordering, right._ordering, - get_column_left, - get_column_right, + l_mapping, + r_mapping, left_order_dominates=(how != "right"), ) - left_join_keys = [ - combined_table[get_column_left(col)] for col in left_column_ids - ] - right_join_keys = [ - combined_table[get_column_right(col)] for col in right_column_ids - ] - join_key_cols = get_coalesced_join_cols(left_join_keys, right_join_keys, how) # We could filter out the original join columns, but predicates/ordering # might still reference them in implicit joins. - columns = ( - join_key_cols - + [combined_table[get_column_left(col.get_name())] for col in left.columns] - + [ - combined_table[get_column_right(col.get_name())] - for col in right.columns - ] - ) + columns = [ + combined_table[l_mapping[col.get_name()]] for col in left.columns + ] + [combined_table[r_mapping[col.get_name()]] for col in right.columns] hidden_ordering_columns = [ *[ - combined_table[get_column_left(col.get_name())] - for col in left.hidden_ordering_columns + combined_table[l_hidden_mapping[col.get_name()]] + for col in left._hidden_ordering_columns ], *[ - combined_table[get_column_right(col.get_name())] - for col in right.hidden_ordering_columns + combined_table[r_hidden_mapping[col.get_name()]] + for col in right._hidden_ordering_columns ], ] - combined_expr = core.ArrayValue( + return core.ArrayValue( left._session, combined_table, columns=columns, hidden_ordering_columns=hidden_ordering_columns, ordering=ordering, ) - if sort: - combined_expr = combined_expr.order_by( - [ - core.OrderingColumnReference(join_key_col.get_name()) - for join_key_col in join_key_cols - ] - ) - return ( - combined_expr, - [key.get_name() for key in join_key_cols], - (get_column_left, get_column_right), - ) - - -def get_coalesced_join_cols( - left_join_cols: typing.Iterable[ibis_types.Value], - right_join_cols: typing.Iterable[ibis_types.Value], - how: str, -) -> typing.List[ibis_types.Value]: - join_key_cols: list[ibis_types.Value] = [] - for left_col, right_col in zip(left_join_cols, right_join_cols): - if how == "left" or how == "inner": - join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_"))) - elif how == "right": - join_key_cols.append(right_col.name(guid.generate_guid(prefix="index_"))) - elif how == "outer": - # The left index and the right index might contain null values, for - # example due to an outer join with different numbers of rows. Coalesce - # these to take the index value from either column. - # Use a random name in case the left index and the right index have the - # same name. In such a case, _x and _y suffixes will already be used. - # Don't need to coalesce if they are exactly the same column. - if left_col.name("index").equals(right_col.name("index")): - join_key_cols.append(left_col.name(guid.generate_guid(prefix="index_"))) - else: - join_key_cols.append( - ibis.coalesce( - left_col, - right_col, - ).name(guid.generate_guid(prefix="index_")) - ) - else: - raise ValueError(f"Unexpected join type: {how}. {constants.FEEDBACK_LINK}") - return join_key_cols def value_to_join_key(value: ibis_types.Value): @@ -261,16 +153,16 @@ def value_to_join_key(value: ibis_types.Value): def join_orderings( left: core.ExpressionOrdering, right: core.ExpressionOrdering, - left_id_mapping: Callable[[str], str], - right_id_mapping: Callable[[str], str], + left_id_mapping: Mapping[str, str], + right_id_mapping: Mapping[str, str], left_order_dominates: bool = True, ) -> core.ExpressionOrdering: left_ordering_refs = [ - ref.with_name(left_id_mapping(ref.column_id)) + ref.with_name(left_id_mapping[ref.column_id]) for ref in left.all_ordering_columns ] right_ordering_refs = [ - ref.with_name(right_id_mapping(ref.column_id)) + ref.with_name(right_id_mapping[ref.column_id]) for ref in right.all_ordering_columns ] if left_order_dominates: @@ -279,10 +171,10 @@ def join_orderings( joined_refs = [*right_ordering_refs, *left_ordering_refs] left_total_order_cols = frozenset( - [left_id_mapping(id) for id in left.total_ordering_columns] + [left_id_mapping[id] for id in left.total_ordering_columns] ) right_total_order_cols = frozenset( - [right_id_mapping(id) for id in right.total_ordering_columns] + [right_id_mapping[id] for id in right.total_ordering_columns] ) return core.ExpressionOrdering( ordering_value_columns=joined_refs, diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 015a7642f8..c91ddffada 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -419,7 +419,7 @@ def _getitem_bool_series(self, key: bigframes.series.Series) -> DataFrame: get_column_right, ) = self._block.index.join(key._block.index, how="left") block = combined_index._block - filter_col_id = get_column_right(key._value_column) + filter_col_id = get_column_right[key._value_column] block = block.filter(filter_col_id) block = block.drop_columns([filter_col_id]) return DataFrame(block) @@ -560,18 +560,18 @@ def _apply_series_binop( ) series_column_id = other._value_column - series_col = get_column_right(series_column_id) + series_col = get_column_right[series_column_id] block = joined_index._block for column_id, label in zip( self._block.value_columns, self._block.column_labels ): block, _ = block.apply_binary_op( - get_column_left(column_id), + get_column_left[column_id], series_col, op, result_label=label, ) - block = block.drop_columns([get_column_left(column_id)]) + block = block.drop_columns([get_column_left[column_id]]) block = block.drop_columns([series_col]) block = block.with_index_labels(self.index.names) @@ -603,22 +603,22 @@ def _apply_dataframe_binop( left_col_id = self._block.value_columns[left_index] right_col_id = other._block.value_columns[right_index] block, result_col_id = block.apply_binary_op( - get_column_left(left_col_id), - get_column_right(right_col_id), + get_column_left[left_col_id], + get_column_right[right_col_id], op, ) binop_result_ids.append(result_col_id) elif left_index >= 0: left_col_id = self._block.value_columns[left_index] block, result_col_id = block.apply_unary_op( - get_column_left(left_col_id), + get_column_left[left_col_id], ops.partial_right(op, None), ) binop_result_ids.append(result_col_id) elif right_index >= 0: right_col_id = other._block.value_columns[right_index] block, result_col_id = block.apply_unary_op( - get_column_right(right_col_id), + get_column_right[right_col_id], ops.partial_left(op, None), ) binop_result_ids.append(result_col_id) @@ -974,7 +974,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame: block.index ) - new_ordering_col = get_column_right(ordering_col) + new_ordering_col = get_column_right[ordering_col] drop_block = joined_index._block drop_block, drop_col = drop_block.apply_unary_op( new_ordering_col, @@ -983,7 +983,7 @@ def _drop_by_index(self, index: indexes.Index) -> DataFrame: drop_block = drop_block.filter(drop_col) original_columns = [ - get_column_left(column) for column in self._block.value_columns + get_column_left[column] for column in self._block.value_columns ] drop_block = drop_block.select_columns(original_columns) return DataFrame(drop_block) @@ -1119,7 +1119,8 @@ def _assign_single_item( # local_df is likely (but not guarunteed) to be cached locally # since the original list came from memory and so is probably < MAX_INLINE_DF_SIZE - this_expr, this_offsets_col_id = self._get_block()._expr.promote_offsets() + this_offsets_col_id = bigframes.core.guid.generate_guid() + this_expr = self._get_block()._expr.promote_offsets(this_offsets_col_id) block = blocks.Block( expr=this_expr, index_labels=self.index.names, @@ -1156,10 +1157,10 @@ def _assign_series_join_on_index( ) column_ids = [ - get_column_left(col_id) for col_id in self._block.cols_matching_label(label) + get_column_left[col_id] for col_id in self._block.cols_matching_label(label) ] block = joined_index._block - source_column = get_column_right(series._value_column) + source_column = get_column_right[series._value_column] # Replace each column matching the label for column_id in column_ids: @@ -2032,8 +2033,8 @@ def _groupby_series( key._block.index, how="inner" if dropna else "left" ) col_ids = [ - *[get_column_left(value) for value in col_ids], - get_column_right(key._value_column), + *[get_column_left[value] for value in col_ids], + get_column_right[key._value_column], ] block = combined_index._block else: diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index f330a703b2..a29dd36c72 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -913,6 +913,16 @@ def ge_op( return x >= y +def coalesce_op( + x: ibis_types.Value, + y: ibis_types.Value, +): + if x.name("name").equals(y.name("name")): + return x + else: + return ibis.coalesce(x, y) + + @short_circuit_nulls(ibis_dtypes.int) def floordiv_op( x: ibis_types.Value, diff --git a/bigframes/operations/base.py b/bigframes/operations/base.py index fc76d07edb..b9abb2cc03 100644 --- a/bigframes/operations/base.py +++ b/bigframes/operations/base.py @@ -196,8 +196,8 @@ def _align_n( get_column_right, ) = block.index.join(other._block.index, how=how) value_ids = [ - *[get_column_left(value) for value in value_ids], - get_column_right(other._value_column), + *[get_column_left[value] for value in value_ids], + get_column_right[other._value_column], ] block = combined_index._block else: diff --git a/bigframes/series.py b/bigframes/series.py index 56e1b43a03..4f2f73bd60 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1136,10 +1136,10 @@ def _groupby_values( key._block.index, how="inner" if dropna else "left" ) - value_col = get_column_left(self._value_column) + value_col = get_column_left[self._value_column] grouping_cols = [ - *[get_column_left(value) for value in grouping_cols], - get_column_right(key._value_column), + *[get_column_left[value] for value in grouping_cols], + get_column_right[key._value_column], ] block = combined_index._block else: