Skip to content

refactor: all ArrayValue ops return only ArrayValue #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 12, 2023
15 changes: 7 additions & 8 deletions bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def column_ids(self) -> typing.Sequence[str]:
return tuple(self._column_names.keys())

@property
def hidden_ordering_columns(self) -> typing.Tuple[ibis_types.Value, ...]:
return self._hidden_ordering_columns
def _hidden_column_ids(self) -> typing.Sequence[str]:
return tuple(self._hidden_ordering_column_names.keys())

@property
def _reduced_predicate(self) -> typing.Optional[ibis_types.BooleanValue]:
Expand Down Expand Up @@ -400,24 +400,23 @@ def _hide_column(self, column_id) -> ArrayValue:
expr_builder.ordering = self._ordering.with_column_remap({column_id: new_name})
return expr_builder.build()

def promote_offsets(self) -> typing.Tuple[ArrayValue, str]:
def promote_offsets(self, col_id: str) -> ArrayValue:
"""
Convenience function to promote copy of column offsets to a value column. Can be used to reset index.
"""
# Special case: offsets already exist
ordering = self._ordering

if (not ordering.is_sequential) or (not ordering.total_order_col):
return self._project_offsets().promote_offsets()
col_id = bigframes.core.guid.generate_guid()
return self._project_offsets().promote_offsets(col_id)
expr_builder = self.builder()
expr_builder.columns = [
self._get_any_column(ordering.total_order_col.column_id).name(col_id),
*self.columns,
]
return expr_builder.build(), col_id
return expr_builder.build()

def select_columns(self, column_ids: typing.Sequence[str]):
def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
return self._projection(
[self._get_ibis_column(col_id) for col_id in column_ids]
)
Expand Down Expand Up @@ -807,7 +806,7 @@ def _create_order_columns(
elif ordering_mode == "string_encoded":
return (self._create_string_ordering_column().name(order_col_name),)
elif expose_hidden_cols:
return self.hidden_ordering_columns
return self._hidden_ordering_columns
return ()

def _create_offset_column(self) -> ibis_types.IntegerColumn:
Expand Down
8 changes: 4 additions & 4 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def equals(block1: blocks.Block, block2: blocks.Block) -> bool:

equality_ids = []
for lcol, rcol in zip(block1.value_columns, block2.value_columns):
lcolmapped = lmap(lcol)
rcolmapped = rmap(rcol)
lcolmapped = lmap[lcol]
rcolmapped = rmap[rcol]
joined_block, result_id = joined_block.apply_binary_op(
lcolmapped, rcolmapped, ops.eq_nulls_match_op
)
Expand Down Expand Up @@ -563,8 +563,8 @@ def align_rows(
joined_index, (get_column_left, get_column_right) = left_block.index.join(
right_block.index, how=join
)
left_columns = [get_column_left(col) for col in left_block.value_columns]
right_columns = [get_column_right(col) for col in right_block.value_columns]
left_columns = [get_column_left[col] for col in left_block.value_columns]
right_columns = [get_column_right[col] for col in right_block.value_columns]

left_block = joined_index._block.select_columns(left_columns)
right_block = joined_index._block.select_columns(right_columns)
Expand Down
125 changes: 71 additions & 54 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import bigframes.core.guid as guid
import bigframes.core.indexes as indexes
import bigframes.core.joins as joins
import bigframes.core.joins.name_resolution as join_names
import bigframes.core.ordering as ordering
import bigframes.core.utils
import bigframes.core.utils as utils
Expand Down Expand Up @@ -97,7 +98,8 @@ def __init__(
"'index_columns' and 'index_labels' must have equal length"
)
if len(index_columns) == 0:
expr, new_index_col_id = expr.promote_offsets()
new_index_col_id = guid.generate_guid()
expr = expr.promote_offsets(new_index_col_id)
index_columns = [new_index_col_id]
self._index_columns = tuple(index_columns)
# Index labels don't need complicated hierarchical access so can store as tuple
Expand Down Expand Up @@ -260,7 +262,8 @@ def reset_index(self, drop: bool = True) -> Block:
from Index classes that point to this block.
"""
block = self
expr, new_index_col_id = self._expr.promote_offsets()
new_index_col_id = guid.generate_guid()
expr = self._expr.promote_offsets(new_index_col_id)
if drop:
# Even though the index might be part of the ordering, keep that
# ordering expression as reset_index shouldn't change the row
Expand Down Expand Up @@ -833,7 +836,8 @@ def aggregate_all_and_stack(
else: # axis_n == 1
# using offsets as identity to group on.
# TODO: Allow to promote identity/total_order columns instead for better perf
expr_with_offsets, offset_col = self.expr.promote_offsets()
offset_col = guid.generate_guid()
expr_with_offsets = self.expr.promote_offsets(offset_col)
stacked_expr = expr_with_offsets.unpivot(
row_labels=self.column_labels.to_list(),
index_col_ids=[guid.generate_guid()],
Expand Down Expand Up @@ -952,9 +956,10 @@ def aggregate(
]
by_column_labels = self._get_labels_for_columns(by_value_columns)
labels = (*by_column_labels, *aggregate_labels)
result_expr_pruned, offsets_id = result_expr.select_columns(
offsets_id = guid.generate_guid()
result_expr_pruned = result_expr.select_columns(
[*by_value_columns, *output_col_ids]
).promote_offsets()
).promote_offsets(offsets_id)

return (
Block(
Expand All @@ -975,7 +980,8 @@ def get_stat(self, column_id: str, stat: agg_ops.AggregateOp):

aggregations = [(column_id, stat, stat.name) for stat in stats_to_fetch]
expr = self.expr.aggregate(aggregations)
expr, offset_index_id = expr.promote_offsets()
offset_index_id = guid.generate_guid()
expr = expr.promote_offsets(offset_index_id)
block = Block(
expr,
index_columns=[offset_index_id],
Expand All @@ -999,7 +1005,8 @@ def get_corr_stat(self, column_id_left: str, column_id_right: str):
)
]
expr = self.expr.corr_aggregate(corr_aggregations)
expr, offset_index_id = expr.promote_offsets()
offset_index_id = guid.generate_guid()
expr = expr.promote_offsets(offset_index_id)
block = Block(
expr,
index_columns=[offset_index_id],
Expand Down Expand Up @@ -1197,7 +1204,8 @@ def retrieve_repr_request_results(
return formatted_df, count, query_job

def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
expr, result_id = self._expr.promote_offsets()
result_id = guid.generate_guid()
expr = self._expr.promote_offsets(result_id)
return (
Block(
expr,
Expand Down Expand Up @@ -1471,67 +1479,76 @@ def merge(
"outer",
"right",
],
left_col_ids: typing.Sequence[str],
right_col_ids: typing.Sequence[str],
left_join_ids: typing.Sequence[str],
right_join_ids: typing.Sequence[str],
sort: bool,
suffixes: tuple[str, str] = ("_x", "_y"),
) -> Block:
(
joined_expr,
coalesced_join_cols,
(get_column_left, get_column_right),
) = joins.join_by_column(
joined_expr = joins.join_by_column(
self.expr,
left_col_ids,
left_join_ids,
other.expr,
right_col_ids,
right_join_ids,
how=how,
sort=sort,
)
get_column_left, get_column_right = join_names.JOIN_NAME_REMAPPER(
self.expr.column_ids, other.expr.column_ids
)
result_columns = []
matching_join_labels = []

coalesced_ids = []
for left_id, right_id in zip(left_join_ids, right_join_ids):
coalesced_id = guid.generate_guid()
joined_expr = joined_expr.project_binary_op(
get_column_left[left_id],
get_column_right[right_id],
ops.coalesce_op,
coalesced_id,
)
coalesced_ids.append(coalesced_id)

for col_id in self.value_columns:
if col_id in left_join_ids:
key_part = left_join_ids.index(col_id)
matching_right_id = right_join_ids[key_part]
if (
self.col_id_to_label[col_id]
== other.col_id_to_label[matching_right_id]
):
matching_join_labels.append(self.col_id_to_label[col_id])
result_columns.append(coalesced_ids[key_part])
else:
result_columns.append(get_column_left[col_id])
else:
result_columns.append(get_column_left[col_id])
for col_id in other.value_columns:
if col_id in right_join_ids:
key_part = right_join_ids.index(col_id)
if other.col_id_to_label[matching_right_id] in matching_join_labels:
pass
else:
result_columns.append(get_column_right[col_id])
else:
result_columns.append(get_column_right[col_id])

# which join key parts should be coalesced
merge_join_key_mask = [
str(self.col_id_to_label[left_id]) == str(other.col_id_to_label[right_id])
for left_id, right_id in zip(left_col_ids, right_col_ids)
]
labels_to_coalesce = [
self.col_id_to_label[col_id]
for i, col_id in enumerate(left_col_ids)
if merge_join_key_mask[i]
]

def left_col_mapping(col_id: str) -> str:
if col_id in left_col_ids:
join_key_part = left_col_ids.index(col_id)
if merge_join_key_mask[join_key_part]:
return coalesced_join_cols[join_key_part]
return get_column_left(col_id)

def right_col_mapping(col_id: str) -> typing.Optional[str]:
if col_id in right_col_ids:
join_key_part = right_col_ids.index(col_id)
if merge_join_key_mask[join_key_part]:
return None
return get_column_right(col_id)

left_columns = [left_col_mapping(col_id) for col_id in self.value_columns]

right_columns = [
typing.cast(str, right_col_mapping(col_id))
for col_id in other.value_columns
if right_col_mapping(col_id)
]
if sort:
# sort uses coalesced join keys always
joined_expr = joined_expr.order_by(
[ordering.OrderingColumnReference(col_id) for col_id in coalesced_ids],
stable=True,
)

expr = joined_expr.select_columns([*left_columns, *right_columns])
joined_expr = joined_expr.select_columns(result_columns)
labels = utils.merge_column_labels(
self.column_labels,
other.column_labels,
coalesce_labels=labels_to_coalesce,
coalesce_labels=matching_join_labels,
suffixes=suffixes,
)

# Constructs default index
expr, offset_index_id = expr.promote_offsets()
offset_index_id = guid.generate_guid()
expr = joined_expr.promote_offsets(offset_index_id)
return Block(expr, index_columns=[offset_index_id], column_labels=labels)

def _force_reproject(self) -> Block:
Expand Down
Loading