Skip to content

feat: Enable time range rolling for DataFrame, DataFrameGroupBy and SeriesGroupBy #1605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions bigframes/core/groupby/dataframe_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

from __future__ import annotations

import datetime
import typing
from typing import Literal, Sequence, Tuple, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
import numpy
import pandas as pd

from bigframes import session
Expand All @@ -30,6 +32,7 @@
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
from bigframes.core.window import rolling
import bigframes.core.window as windows
import bigframes.core.window_spec as window_specs
import bigframes.dataframe as df
Expand Down Expand Up @@ -309,28 +312,41 @@ def diff(self, periods=1) -> series.Series:
@validations.requires_ordering()
def rolling(
self,
window: int,
window: int | pd.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
min_periods=None,
on: str | None = None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> windows.Window:
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
)
skip_agg_col_id = (
None if on is None else self._block.resolve_label_exact_or_error(on)
)
return windows.Window(
block,
window_spec,
self._selected_cols,
if isinstance(window, int):
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
)
skip_agg_col_id = (
None if on is None else self._block.resolve_label_exact_or_error(on)
)
return windows.Window(
block,
window_spec,
self._selected_cols,
drop_null_groups=self._dropna,
skip_agg_column_id=skip_agg_col_id,
)

return rolling.create_range_window(
self._block,
window,
min_periods=min_periods,
value_column_ids=self._selected_cols,
on=on,
closed=closed,
is_series=False,
grouping_keys=self._by_col_ids,
drop_null_groups=self._dropna,
skip_agg_column_id=skip_agg_col_id,
)

@validations.requires_ordering()
Expand Down
44 changes: 30 additions & 14 deletions bigframes/core/groupby/series_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@

from __future__ import annotations

import datetime
import typing
from typing import Literal, Sequence, Union

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
import numpy
import pandas

from bigframes import session
from bigframes.core import expression as ex
Expand All @@ -29,6 +32,7 @@
import bigframes.core.ordering as order
import bigframes.core.utils as utils
import bigframes.core.validations as validations
from bigframes.core.window import rolling
import bigframes.core.window as windows
import bigframes.core.window_spec as window_specs
import bigframes.dataframe as df
Expand Down Expand Up @@ -246,24 +250,36 @@ def diff(self, periods=1) -> series.Series:
@validations.requires_ordering()
def rolling(
self,
window: int,
window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
min_periods=None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> windows.Window:
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
)
return windows.Window(
block,
window_spec,
[self._value_column],
drop_null_groups=self._dropna,
if isinstance(window, int):
window_spec = window_specs.WindowSpec(
bounds=window_specs.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids),
)
block = self._block.order_by(
[order.ascending_over(col) for col in self._by_col_ids],
)
return windows.Window(
block,
window_spec,
[self._value_column],
drop_null_groups=self._dropna,
is_series=True,
)

return rolling.create_range_window(
self._block,
window,
min_periods=min_periods,
value_column_ids=[self._value_column],
closed=closed,
is_series=True,
grouping_keys=self._by_col_ids,
drop_null_groups=self._dropna,
)

@validations.requires_ordering()
Expand Down
5 changes: 5 additions & 0 deletions bigframes/core/window/ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ def _(root: nodes.WindowOpNode, column_id: str):

@find_order_direction.register
def _(root: nodes.ProjectionNode, column_id: str):
for expr, ref in root.assignments:
if ref.name == column_id and isinstance(expr, ex.DerefOp):
# This source column is renamed.
return find_order_direction(root.child, expr.id.name)

return find_order_direction(root.child, column_id)
76 changes: 50 additions & 26 deletions bigframes/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,7 @@ def _apply_aggregate(
self,
op: agg_ops.UnaryAggregateOp,
):
agg_col_ids = [
col_id
for col_id in self._value_column_ids
if col_id != self._skip_agg_column_id
]
agg_block = self._aggregate_block(op, agg_col_ids)

if self._skip_agg_column_id is not None:
# Concat the skipped column to the result.
agg_block, _ = agg_block.join(
self._block.select_column(self._skip_agg_column_id), how="outer"
)
agg_block = self._aggregate_block(op)

if self._is_series:
from bigframes.series import Series
Expand All @@ -102,9 +91,12 @@ def _apply_aggregate(
]
return DataFrame(agg_block)._reindex_columns(column_labels)

def _aggregate_block(
self, op: agg_ops.UnaryAggregateOp, agg_col_ids: typing.List[str]
) -> blocks.Block:
def _aggregate_block(self, op: agg_ops.UnaryAggregateOp) -> blocks.Block:
agg_col_ids = [
col_id
for col_id in self._value_column_ids
if col_id != self._skip_agg_column_id
]
block, result_ids = self._block.multi_apply_window_op(
agg_col_ids,
op,
Expand All @@ -123,39 +115,71 @@ def _aggregate_block(
block = block.set_index(col_ids=index_ids)

labels = [self._block.col_id_to_label[col] for col in agg_col_ids]
if self._skip_agg_column_id is not None:
result_ids = [self._skip_agg_column_id, *result_ids]
labels.insert(0, self._block.col_id_to_label[self._skip_agg_column_id])

return block.select_columns(result_ids).with_column_labels(labels)


def create_range_window(
block: blocks.Block,
window: pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
*,
value_column_ids: typing.Sequence[str] = tuple(),
min_periods: int | None,
on: str | None = None,
closed: typing.Literal["right", "left", "both", "neither"],
is_series: bool,
grouping_keys: typing.Sequence[str] = tuple(),
drop_null_groups: bool = True,
) -> Window:

index_dtypes = block.index.dtypes
if len(index_dtypes) > 1:
raise ValueError("Range rolling on MultiIndex is not supported")
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
raise ValueError("Index type should be timestamps with timezones")
if on is None:
# Rolling on index
index_dtypes = block.index.dtypes
if len(index_dtypes) > 1:
raise ValueError("Range rolling on MultiIndex is not supported")
if index_dtypes[0] != dtypes.TIMESTAMP_DTYPE:
raise ValueError("Index type should be timestamps with timezones")
rolling_key_col_id = block.index_columns[0]
else:
# Rolling on a specific column
rolling_key_col_id = block.resolve_label_exact_or_error(on)
if block.expr.get_column_type(rolling_key_col_id) != dtypes.TIMESTAMP_DTYPE:
raise ValueError(f"Column {on} type should be timestamps with timezones")

order_direction = window_ordering.find_order_direction(
block.expr.node, block.index_columns[0]
block.expr.node, rolling_key_col_id
)
if order_direction is None:
target_str = "index" if on is None else f"column {on}"
raise ValueError(
"The index might not be in a monotonic order. Please sort the index before rolling."
f"The {target_str} might not be in a monotonic order. Please sort by {target_str} before rolling."
)
if isinstance(window, str):
window = pandas.Timedelta(window)
spec = window_spec.WindowSpec(
bounds=window_spec.RangeWindowBounds.from_timedelta_window(window, closed),
min_periods=1 if min_periods is None else min_periods,
ordering=(
ordering.OrderingExpression(
ex.deref(block.index_columns[0]), order_direction
),
ordering.OrderingExpression(ex.deref(rolling_key_col_id), order_direction),
),
grouping_keys=tuple(ex.deref(col) for col in grouping_keys),
)

selected_value_col_ids = (
value_column_ids if value_column_ids else block.value_columns
)
# This step must be done after finding the order direction of the window key.
if grouping_keys:
block = block.order_by([ordering.ascending_over(col) for col in grouping_keys])

return Window(
block,
spec,
value_column_ids=selected_value_col_ids,
is_series=is_series,
skip_agg_column_id=None if on is None else rolling_key_col_id,
drop_null_groups=drop_null_groups,
)
return Window(block, spec, block.value_columns, is_series=is_series)
36 changes: 23 additions & 13 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@
import bigframes.core.utils as utils
import bigframes.core.validations as validations
import bigframes.core.window
from bigframes.core.window import rolling
import bigframes.core.window_spec as windows
import bigframes.dtypes
import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.operations as ops
import bigframes.operations.aggregations
import bigframes.operations.aggregations as agg_ops
import bigframes.operations.ai
import bigframes.operations.plotting as plotting
Expand Down Expand Up @@ -3393,23 +3393,33 @@ def _perform_join_by_index(
@validations.requires_ordering()
def rolling(
self,
window: int,
window: int | pandas.Timedelta | numpy.timedelta64 | datetime.timedelta | str,
min_periods=None,
on: str | None = None,
closed: Literal["right", "left", "both", "neither"] = "right",
) -> bigframes.core.window.Window:
window_def = windows.WindowSpec(
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
)
skip_agg_col_id = (
None if on is None else self._block.resolve_label_exact_or_error(on)
)
return bigframes.core.window.Window(
if isinstance(window, int):
window_def = windows.WindowSpec(
bounds=windows.RowsWindowBounds.from_window_size(window, closed),
min_periods=min_periods if min_periods is not None else window,
)
skip_agg_col_id = (
None if on is None else self._block.resolve_label_exact_or_error(on)
)
return bigframes.core.window.Window(
self._block,
window_def,
self._block.value_columns,
skip_agg_column_id=skip_agg_col_id,
)

return rolling.create_range_window(
self._block,
window_def,
self._block.value_columns,
skip_agg_column_id=skip_agg_col_id,
window,
min_periods=min_periods,
on=on,
closed=closed,
is_series=False,
)

@validations.requires_ordering()
Expand Down
6 changes: 5 additions & 1 deletion bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,11 @@ def rolling(
)

return rolling.create_range_window(
self._block, window, min_periods, closed, is_series=True
block=self._block,
window=window,
min_periods=min_periods,
closed=closed,
is_series=True,
)

@validations.requires_ordering()
Expand Down
Loading