Skip to content

perf: Optimize repr for unordered gbq table #1778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bigframes/core/rewrite/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
try_reduce_to_local_scan,
try_reduce_to_table_scan,
)
from bigframes.core.rewrite.slices import pull_up_limits, rewrite_slice
from bigframes.core.rewrite.slices import pull_out_limit, pull_up_limits, rewrite_slice
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
from bigframes.core.rewrite.windows import rewrite_range_rolling

Expand All @@ -32,6 +32,7 @@
"rewrite_slice",
"rewrite_timedelta_expressions",
"pull_up_limits",
"pull_out_limit",
"remap_variables",
"defer_order",
"column_pruning",
Expand Down
8 changes: 4 additions & 4 deletions bigframes/core/rewrite/slices.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@


def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode:
new_child, pulled_limit = _pullup_slice_inner(root.child)
new_child, pulled_limit = pull_out_limit(root.child)
if new_child == root.child:
return root
elif pulled_limit is None:
Expand All @@ -37,7 +37,7 @@ def pull_up_limits(root: nodes.ResultNode) -> nodes.ResultNode:
return dataclasses.replace(root, child=new_child, limit=new_limit)


def _pullup_slice_inner(
def pull_out_limit(
root: nodes.BigFrameNode,
) -> Tuple[nodes.BigFrameNode, Optional[int]]:
"""
Expand All @@ -53,15 +53,15 @@ def _pullup_slice_inner(
assert root.step == 1
assert root.stop is not None
limit = root.stop
new_root, prior_limit = _pullup_slice_inner(root.child)
new_root, prior_limit = pull_out_limit(root.child)
if (prior_limit is not None) and (prior_limit < limit):
limit = prior_limit
return new_root, limit
elif (
isinstance(root, (nodes.SelectionNode, nodes.ProjectionNode))
and root.row_preserving
):
new_child, prior_limit = _pullup_slice_inner(root.child)
new_child, prior_limit = pull_out_limit(root.child)
if prior_limit is not None:
return root.transform_children(lambda _: new_child), prior_limit
# Most ops don't support pulling up slice, like filter, agg, join, etc.
Expand Down
25 changes: 6 additions & 19 deletions bigframes/core/tree_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,13 @@ def can_fast_head(node: nodes.BigFrameNode) -> bool:
# To do fast head operation:
# (1) the underlying data must be arranged/indexed according to the logical ordering
# (2) transformations must support pushing down LIMIT or a filter on row numbers
return has_fast_offset_address(node) or has_fast_offset_address(node)


def has_fast_orderby_limit(node: nodes.BigFrameNode) -> bool:
"""True iff ORDER BY LIMIT can be performed without a large full table scan."""
# TODO: In theory compatible with some Slice nodes, potentially by adding OFFSET
if isinstance(node, nodes.LeafNode):
return node.fast_ordered_limit
if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)):
return has_fast_orderby_limit(node.child)
return False


def has_fast_offset_address(node: nodes.BigFrameNode) -> bool:
"""True iff specific offsets can be scanned without a large full table scan."""
# TODO: In theory can push offset lookups through slice operators by translating indices
if isinstance(node, nodes.LeafNode):
return node.fast_offsets
if isinstance(node, nodes.ReadLocalNode):
# always cheap to push slice into local data
return True
if isinstance(node, nodes.ReadTableNode):
return (node.source.ordering is None) or (node.fast_ordered_limit)
if isinstance(node, (nodes.ProjectionNode, nodes.SelectionNode)):
return has_fast_offset_address(node.child)
return can_fast_head(node.child)
return False


Expand Down
5 changes: 5 additions & 0 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ def read_gbq_table( # type: ignore[overload-overlap]
enable_snapshot: bool = ...,
dry_run: Literal[False] = ...,
force_total_order: Optional[bool] = ...,
n_rows: Optional[int] = None,
) -> dataframe.DataFrame:
...

Expand All @@ -408,6 +409,7 @@ def read_gbq_table(
enable_snapshot: bool = ...,
dry_run: Literal[True] = ...,
force_total_order: Optional[bool] = ...,
n_rows: Optional[int] = None,
) -> pandas.Series:
...

Expand All @@ -428,6 +430,7 @@ def read_gbq_table(
enable_snapshot: bool = True,
dry_run: bool = False,
force_total_order: Optional[bool] = None,
n_rows: Optional[int] = None,
) -> dataframe.DataFrame | pandas.Series:
import bigframes._tools.strings
import bigframes.dataframe as dataframe
Expand Down Expand Up @@ -618,6 +621,7 @@ def read_gbq_table(
at_time=time_travel_timestamp if enable_snapshot else None,
primary_key=primary_key,
session=self._session,
n_rows=n_rows,
)
# if we don't have a unique index, we order by row hash if we are in strict mode
if (
Expand Down Expand Up @@ -852,6 +856,7 @@ def read_gbq_query(
columns=columns,
use_cache=configuration["query"]["useQueryCache"],
force_total_order=force_total_order,
n_rows=query_job.result().total_rows,
# max_results and filters are omitted because they are already
# handled by to_query(), above.
)
Expand Down
24 changes: 19 additions & 5 deletions bigframes/session/read_api_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ def execute(
ordered: bool,
peek: Optional[int] = None,
) -> Optional[executor.ExecuteResult]:
node = self._try_adapt_plan(plan, ordered)
if not node:
adapt_result = self._try_adapt_plan(plan, ordered)
if not adapt_result:
return None
node, limit = adapt_result
if node.explicitly_ordered and ordered:
return None

if limit is not None:
if peek is None or limit < peek:
peek = limit

import google.cloud.bigquery_storage_v1.types as bq_storage_types
from google.protobuf import timestamp_pb2

Expand Down Expand Up @@ -117,11 +122,20 @@ def _try_adapt_plan(
self,
plan: bigframe_node.BigFrameNode,
ordered: bool,
) -> Optional[nodes.ReadTableNode]:
) -> Optional[tuple[nodes.ReadTableNode, Optional[int]]]:
"""
Tries to simplify the plan to an equivalent single ReadTableNode. Otherwise, returns None.
Tries to simplify the plan to an equivalent single ReadTableNode and a limit. Otherwise, returns None.
"""
plan, limit = rewrite.pull_out_limit(plan)
# bake_order does not allow slice ops
plan = plan.bottom_up(rewrite.rewrite_slice)
if not ordered:
# gets rid of order_by ops
plan = rewrite.bake_order(plan)
return rewrite.try_reduce_to_table_scan(plan)
read_table_node = rewrite.try_reduce_to_table_scan(plan)
if read_table_node is None:
return None
if (limit is not None) and (read_table_node.source.ordering is not None):
# read api can only use physical ordering to limit, not a logical ordering
return None
return (read_table_node, limit)
52 changes: 52 additions & 0 deletions tests/system/small/session/test_read_gbq_colab.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_session):
executions_before_sql = maybe_ordered_session._metrics.execution_count
df = maybe_ordered_session._read_gbq_colab(
"""
SELECT
Expand All @@ -32,16 +33,67 @@ def test_read_gbq_colab_to_pandas_batches_preserves_order_by(maybe_ordered_sessi
LIMIT 300
"""
)
executions_before_python = maybe_ordered_session._metrics.execution_count
batches = df.to_pandas_batches(
page_size=100,
)
executions_after = maybe_ordered_session._metrics.execution_count

total_rows = 0
for batch in batches:
assert batch["total"].is_monotonic_decreasing
total_rows += len(batch.index)

assert total_rows > 0
assert executions_after == executions_before_python == executions_before_sql + 1


def test_read_gbq_colab_peek_avoids_requery(maybe_ordered_session):
executions_before_sql = maybe_ordered_session._metrics.execution_count
df = maybe_ordered_session._read_gbq_colab(
"""
SELECT
name,
SUM(number) AS total
FROM
`bigquery-public-data.usa_names.usa_1910_2013`
WHERE state LIKE 'W%'
GROUP BY name
ORDER BY total DESC
LIMIT 300
"""
)
executions_before_python = maybe_ordered_session._metrics.execution_count
result = df.peek(100)
executions_after = maybe_ordered_session._metrics.execution_count

# Ok, this isn't guaranteed by peek, but should happen with read api based impl
# if starts failing, maybe stopped using read api?
assert result["total"].is_monotonic_decreasing

assert len(result) == 100
assert executions_after == executions_before_python == executions_before_sql + 1


def test_read_gbq_colab_repr_avoids_requery(maybe_ordered_session):
executions_before_sql = maybe_ordered_session._metrics.execution_count
df = maybe_ordered_session._read_gbq_colab(
"""
SELECT
name,
SUM(number) AS total
FROM
`bigquery-public-data.usa_names.usa_1910_2013`
WHERE state LIKE 'W%'
GROUP BY name
ORDER BY total DESC
LIMIT 300
"""
)
executions_before_python = maybe_ordered_session._metrics.execution_count
_ = repr(df)
executions_after = maybe_ordered_session._metrics.execution_count
assert executions_after == executions_before_python == executions_before_sql + 1


def test_read_gbq_colab_includes_formatted_scalars(session):
Expand Down