From 848f86a21227b1c0fa6ae4b8a06e7a87134e2ff9 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Thu, 27 Mar 2025 21:32:09 +0000 Subject: [PATCH 1/6] feat: add parameter in dataframe/groupby.rolling() --- bigframes/core/blocks.py | 9 +- bigframes/core/groupby/dataframe_group_by.py | 4 +- bigframes/core/groupby/series_group_by.py | 2 +- bigframes/core/window_spec.py | 12 ++- bigframes/dataframe.py | 4 +- bigframes/series.py | 4 +- tests/system/small/test_window.py | 88 ++++++++++++-------- 7 files changed, 78 insertions(+), 45 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 2992718412..045c679e3c 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -987,7 +987,7 @@ def apply_nary_op( def multi_apply_window_op( self, columns: typing.Sequence[str], - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec, *, skip_null_groups: bool = False, @@ -1058,7 +1058,7 @@ def project_exprs( def apply_window_op( self, column: str, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec, *, result_label: Label = None, @@ -1066,6 +1066,11 @@ def apply_window_op( skip_reproject_unsafe: bool = False, never_skip_nulls: bool = False, ) -> typing.Tuple[Block, str]: + if column == window_spec.on: + # For row-based window, do nothing + # TODO(b/388916840) Support range rolling with "on" + return self, column + block = self if skip_null_groups: for key in window_spec.grouping_keys: diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index b97a5f4c48..df41bfd784 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -310,12 +310,14 @@ def rolling( self, window: int, min_periods=None, + on: str | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> windows.Window: window_spec = window_specs.WindowSpec( bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), min_periods=min_periods if min_periods is not None else window, grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), + on=None if on is None else self._block.resolve_label_exact_or_error(on), ) block = self._block.order_by( [order.ascending_over(col) for col in self._by_col_ids], @@ -511,7 +513,7 @@ def _aggregate_all( def _apply_window_op( self, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window: typing.Optional[window_specs.WindowSpec] = None, numeric_only: bool = False, ): diff --git a/bigframes/core/groupby/series_group_by.py b/bigframes/core/groupby/series_group_by.py index 761a02bd34..6d3c1252de 100644 --- a/bigframes/core/groupby/series_group_by.py +++ b/bigframes/core/groupby/series_group_by.py @@ -294,7 +294,7 @@ def _aggregate(self, aggregate_op: agg_ops.UnaryAggregateOp) -> series.Series: def _apply_window_op( self, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, discard_name=False, window: typing.Optional[window_specs.WindowSpec] = None, never_skip_nulls: bool = False, diff --git a/bigframes/core/window_spec.py b/bigframes/core/window_spec.py index 142e3a7e00..606499715b 100644 --- a/bigframes/core/window_spec.py +++ b/bigframes/core/window_spec.py @@ -187,16 +187,20 @@ def __post_init__(self): class WindowSpec: """ Specifies a window over which aggregate and analytic function may be applied. - grouping_keys: set of column ids to group on - preceding: Number of preceding rows in the window - following: Number of preceding rows in the window - ordering: List of columns ids and ordering direction to override base ordering + + Attributes: + grouping_keys: A set of column ids to group on + bounds: The window boundaries + ordering: A list of columns ids and ordering direction to override base ordering + min_periods: The minimum number of observations in window required to have a value + on: The id of the column on which to calculate the rolling window """ grouping_keys: Tuple[ex.DerefOp, ...] = tuple() ordering: Tuple[orderings.OrderingExpression, ...] = tuple() bounds: Union[RowsWindowBounds, RangeWindowBounds, None] = None min_periods: int = 0 + on: str | None = None @property def row_bounded(self): diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 7f9e62b7dd..fa3b428471 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3312,11 +3312,13 @@ def rolling( self, window: int, min_periods=None, + on: str | None = None, closed: Literal["right", "left", "both", "neither"] = "right", ) -> bigframes.core.window.Window: window_def = windows.WindowSpec( bounds=windows.RowsWindowBounds.from_window_size(window, closed), min_periods=min_periods if min_periods is not None else window, + on=None if on is None else self._block.resolve_label_exact_or_error(on), ) return bigframes.core.window.Window( self._block, window_def, self._block.value_columns @@ -3483,7 +3485,7 @@ def pct_change(self, periods: int = 1) -> DataFrame: def _apply_window_op( self, - op: agg_ops.WindowOp, + op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec, ): block, result_ids = self._block.multi_apply_window_op( diff --git a/bigframes/series.py b/bigframes/series.py index be87129929..d2a3dcf78f 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1378,7 +1378,9 @@ def _apply_aggregation( ) -> Any: return self._block.get_stat(self._value_column, op) - def _apply_window_op(self, op: agg_ops.WindowOp, window_spec: windows.WindowSpec): + def _apply_window_op( + self, op: agg_ops.UnaryWindowOp, window_spec: windows.WindowSpec + ): block = self._block block, result_id = block.apply_window_op( self._value_column, op, window_spec=window_spec, result_label=self.name diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index 68613f1372..3ae277f9b5 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -20,12 +20,9 @@ def rolling_dfs(scalars_dfs): bf_df, pd_df = scalars_dfs - target_cols = ["int64_too", "float64_col", "bool_col"] + target_cols = ["int64_too", "float64_col", "int64_col"] - bf_df = bf_df[target_cols].set_index("bool_col") - pd_df = pd_df[target_cols].set_index("bool_col") - - return bf_df, pd_df + return bf_df[target_cols], pd_df[target_cols] @pytest.fixture(scope="module") @@ -49,31 +46,66 @@ def test_dataframe_rolling_closed_param(rolling_dfs, closed): @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): bf_df, pd_df = rolling_dfs + # Need to specify column subset for comparison due to b/406841327 + check_columns = ["float64_col", "int64_col"] actual_result = ( - bf_df.groupby(level=0).rolling(window=3, closed=closed).sum().to_pandas() + bf_df.groupby(bf_df["int64_too"] % 2) + .rolling(window=3, closed=closed) + .sum() + .to_pandas() ) - expected_result = pd_df.groupby(level=0).rolling(window=3, closed=closed).sum() - pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + expected_result = ( + pd_df.groupby(pd_df["int64_too"] % 2).rolling(window=3, closed=closed).sum() + ) + pd.testing.assert_frame_equal( + actual_result[check_columns], expected_result, check_dtype=False + ) -def test_dataframe_rolling_default_closed_param(rolling_dfs): +@pytest.mark.parametrize("on", ["int64_too", "float64_col"]) +def test_dataframe_rolling_on(rolling_dfs, on): bf_df, pd_df = rolling_dfs - actual_result = bf_df.rolling(window=3).sum().to_pandas() + actual_result = bf_df.rolling(window=3, on=on).sum().to_pandas() - expected_result = pd_df.rolling(window=3).sum() + expected_result = pd_df.rolling(window=3, on=on).sum() pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) -def test_dataframe_groupby_rolling_default_closed_param(rolling_dfs): +def test_dataframe_rolling_on_invalid_column_raise_error(rolling_dfs): + bf_df, _ = rolling_dfs + + with pytest.raises(ValueError): + bf_df.rolling(window=3, on="whatever").sum() + + +def test_dataframe_groupby_rolling_on(rolling_dfs): bf_df, pd_df = rolling_dfs + # Need to specify column subset for comparison due to b/406841327 + check_columns = ["float64_col", "int64_col"] - actual_result = bf_df.groupby(level=0).rolling(window=3).sum().to_pandas() + actual_result = ( + bf_df.groupby(bf_df["int64_too"] % 2) + .rolling(window=3, on="float64_col") + .sum() + .to_pandas() + ) - expected_result = pd_df.groupby(level=0).rolling(window=3).sum() - pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) + expected_result = ( + pd_df.groupby(pd_df["int64_too"] % 2).rolling(window=3, on="float64_col").sum() + ) + pd.testing.assert_frame_equal( + actual_result[check_columns], expected_result, check_dtype=False + ) + + +def test_dataframe_groupby_rolling_on_invalid_column_raise_error(rolling_dfs): + bf_df, _ = rolling_dfs + + with pytest.raises(ValueError): + bf_df.groupby(level=0).rolling(window=3, on="whatever").sum() @pytest.mark.parametrize("closed", ["left", "right", "both", "neither"]) @@ -103,24 +135,6 @@ def test_series_groupby_rolling_closed_param(rolling_series, closed): pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) -def test_series_rolling_default_closed_param(rolling_series): - bf_series, df_series = rolling_series - - actual_result = bf_series.rolling(window=3).sum().to_pandas() - - expected_result = df_series.rolling(window=3).sum() - pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) - - -def test_series_groupby_rolling_default_closed_param(rolling_series): - bf_series, df_series = rolling_series - - actual_result = bf_series.groupby(bf_series % 2).rolling(window=3).sum().to_pandas() - - expected_result = df_series.groupby(df_series % 2).rolling(window=3).sum() - pd.testing.assert_series_equal(actual_result, expected_result, check_dtype=False) - - @pytest.mark.parametrize( ("windowing"), [ @@ -181,8 +195,12 @@ def test_series_window_agg_ops(rolling_series, windowing, agg_op): pytest.param(lambda x: x.var(), id="var"), ], ) -def test_dataframe_window_agg_ops(rolling_dfs, windowing, agg_op): - bf_df, pd_df = rolling_dfs +def test_dataframe_window_agg_ops(scalars_dfs, windowing, agg_op): + bf_df, pd_df = scalars_dfs + target_columns = ["int64_too", "float64_col", "bool_col"] + index_column = "bool_col" + bf_df = bf_df[target_columns].set_index(index_column) + pd_df = pd_df[target_columns].set_index(index_column) bf_result = agg_op(windowing(bf_df)).to_pandas() From fca5b459722ec9df863a5353e8d1fddfe9740ddc Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Fri, 28 Mar 2025 21:10:34 +0000 Subject: [PATCH 2/6] fix test --- tests/system/small/test_window.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/system/small/test_window.py b/tests/system/small/test_window.py index 3ae277f9b5..bfe97f5103 100644 --- a/tests/system/small/test_window.py +++ b/tests/system/small/test_window.py @@ -64,13 +64,12 @@ def test_dataframe_groupby_rolling_closed_param(rolling_dfs, closed): ) -@pytest.mark.parametrize("on", ["int64_too", "float64_col"]) -def test_dataframe_rolling_on(rolling_dfs, on): +def test_dataframe_rolling_on(rolling_dfs): bf_df, pd_df = rolling_dfs - actual_result = bf_df.rolling(window=3, on=on).sum().to_pandas() + actual_result = bf_df.rolling(window=3, on="int64_too").sum().to_pandas() - expected_result = pd_df.rolling(window=3, on=on).sum() + expected_result = pd_df.rolling(window=3, on="int64_too").sum() pd.testing.assert_frame_equal(actual_result, expected_result, check_dtype=False) From 963eba515cd0ca7f1c536308be6cdd95501fb512 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Mon, 31 Mar 2025 20:19:41 +0000 Subject: [PATCH 3/6] add docs --- .../bigframes_vendored/pandas/core/generic.py | 12 ++++++++++++ .../pandas/core/groupby/__init__.py | 10 ++++++++++ 2 files changed, 22 insertions(+) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 8dd43fd8da..451a6bdc2e 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1052,6 +1052,8 @@ def rolling( self, window, min_periods: int | None = None, + on: str | None = None, + closed: Literal["right", "left", "both", "neither"] = "right", ): """ Provide rolling window calculations. @@ -1083,6 +1085,16 @@ def rolling( For a window that is specified by an integer, ``min_periods`` will default to the size of the window. + on (str, optional): + For a DataFrame, a column label or Index level on which to calculate the rolling window, + rather than the DataFrame’s index. + + closed (str, default 'right'): + If 'right', the first point in the window is excluded from calculations. + If 'left', the last point in the window is excluded from calculations. + If 'both', the no points in the window are excluded from calculations. + If 'neither', the first and last points in the window are excluded from calculations. + Returns: bigframes.core.window.Window: ``Window`` subclass if a ``win_type`` is passed. ``Rolling`` subclass if ``win_type`` is not passed. diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 31a9aa6a93..23ac0f1a45 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -1035,6 +1035,16 @@ def rolling(self, *args, **kwargs): For a window that is specified by an integer, ``min_periods`` will default to the size of the window. + on (str, optional): + For a DataFrame, a column label or Index level on which to calculate the rolling window, + rather than the DataFrame’s index. + + closed (str, default 'right'): + If 'right', the first point in the window is excluded from calculations. + If 'left', the last point in the window is excluded from calculations. + If 'both', the no points in the window are excluded from calculations. + If 'neither', the first and last points in the window are excluded from calculations. + Returns: bigframes.pandas.DataFrame or bigframes.pandas.Series: Return a new grouper with our rolling appended. From 50df383051492deea87e68a0dcfd1c4ac799d595 Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 1 Apr 2025 17:40:42 +0000 Subject: [PATCH 4/6] update doc --- .../bigframes_vendored/pandas/core/generic.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index 451a6bdc2e..a5a6baf854 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1058,35 +1058,36 @@ def rolling( """ Provide rolling window calculations. + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + + >>> s = bpd.Series([0,1,2,3,4]) + >>> s.rolling(window=3).min() + 0 + 1 + 2 0 + 3 1 + 4 2 + dtype: Int64 + Args: - window (int, timedelta, str, offset, or BaseIndexer subclass): + window (int): Size of the moving window. If an integer, the fixed number of observations used for each window. - If a timedelta, str, or offset, the time period of each window. Each - window will be a variable sized based on the observations included in - the time-period. This is only valid for datetime-like indexes. - To learn more about the offsets & frequency strings, please see `this link - `__. - - If a BaseIndexer subclass, the window boundaries - based on the defined ``get_window_bounds`` method. Additional rolling - keyword arguments, namely ``min_periods``, ``center``, ``closed`` and - ``step`` will be passed to ``get_window_bounds``. - min_periods (int, default None): Minimum number of observations in window required to have a value; otherwise, result is ``np.nan``. - For a window that is specified by an offset, ``min_periods`` will default to 1. - For a window that is specified by an integer, ``min_periods`` will default to the size of the window. on (str, optional): - For a DataFrame, a column label or Index level on which to calculate the rolling window, + For a DataFrame, a column label on which to calculate the rolling window, rather than the DataFrame’s index. closed (str, default 'right'): From 1a1089f5de9bb19c63481cfdf088496f17a0284f Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 1 Apr 2025 19:05:08 +0000 Subject: [PATCH 5/6] update docs --- third_party/bigframes_vendored/pandas/core/generic.py | 10 ++++++++++ .../bigframes_vendored/pandas/core/groupby/__init__.py | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/third_party/bigframes_vendored/pandas/core/generic.py b/third_party/bigframes_vendored/pandas/core/generic.py index a5a6baf854..ee2400dfd8 100644 --- a/third_party/bigframes_vendored/pandas/core/generic.py +++ b/third_party/bigframes_vendored/pandas/core/generic.py @@ -1072,6 +1072,16 @@ def rolling( 4 2 dtype: Int64 + >>> df = bpd.DataFrame({'A': [0,1,2,3], 'B': [0,2,4,6]}) + >>> df.rolling(window=2, on='A', closed='both').sum() + A B + 0 0 + 1 1 2 + 2 2 6 + 3 3 12 + + [4 rows x 2 columns] + Args: window (int): Size of the moving window. diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index 23ac0f1a45..1b5bd308e3 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -1036,7 +1036,7 @@ def rolling(self, *args, **kwargs): to the size of the window. on (str, optional): - For a DataFrame, a column label or Index level on which to calculate the rolling window, + For a DataFrame, a column label on which to calculate the rolling window, rather than the DataFrame’s index. closed (str, default 'right'): From d9888d2945b3b6e91b1b12f3821dc7bb53066c2f Mon Sep 17 00:00:00 2001 From: Shenyang Cai Date: Tue, 1 Apr 2025 22:24:00 +0000 Subject: [PATCH 6/6] remove on param from windowspec --- bigframes/core/blocks.py | 5 -- bigframes/core/groupby/dataframe_group_by.py | 10 +++- bigframes/core/window/rolling.py | 51 ++++++++++++++------ bigframes/dataframe.py | 9 +++- 4 files changed, 52 insertions(+), 23 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 045c679e3c..f9bb44ca61 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1066,11 +1066,6 @@ def apply_window_op( skip_reproject_unsafe: bool = False, never_skip_nulls: bool = False, ) -> typing.Tuple[Block, str]: - if column == window_spec.on: - # For row-based window, do nothing - # TODO(b/388916840) Support range rolling with "on" - return self, column - block = self if skip_null_groups: for key in window_spec.grouping_keys: diff --git a/bigframes/core/groupby/dataframe_group_by.py b/bigframes/core/groupby/dataframe_group_by.py index df41bfd784..bfeca32945 100644 --- a/bigframes/core/groupby/dataframe_group_by.py +++ b/bigframes/core/groupby/dataframe_group_by.py @@ -317,13 +317,19 @@ def rolling( bounds=window_specs.RowsWindowBounds.from_window_size(window, closed), min_periods=min_periods if min_periods is not None else window, grouping_keys=tuple(ex.deref(col) for col in self._by_col_ids), - on=None if on is None else self._block.resolve_label_exact_or_error(on), ) block = self._block.order_by( [order.ascending_over(col) for col in self._by_col_ids], ) + skip_agg_col_id = ( + None if on is None else self._block.resolve_label_exact_or_error(on) + ) return windows.Window( - block, window_spec, self._selected_cols, drop_null_groups=self._dropna + block, + window_spec, + self._selected_cols, + drop_null_groups=self._dropna, + skip_agg_column_id=skip_agg_col_id, ) @validations.requires_ordering() diff --git a/bigframes/core/window/rolling.py b/bigframes/core/window/rolling.py index 7758145fd4..b10b2da123 100644 --- a/bigframes/core/window/rolling.py +++ b/bigframes/core/window/rolling.py @@ -34,12 +34,16 @@ def __init__( value_column_ids: typing.Sequence[str], drop_null_groups: bool = True, is_series: bool = False, + skip_agg_column_id: str | None = None, ): self._block = block self._window_spec = window_spec self._value_column_ids = value_column_ids self._drop_null_groups = drop_null_groups self._is_series = is_series + # The column ID that won't be aggregated on. + # This is equivalent to pandas `on` parameter in rolling() + self._skip_agg_column_id = skip_agg_column_id def count(self): return self._apply_aggregate(agg_ops.count_op) @@ -66,10 +70,37 @@ def _apply_aggregate( self, op: agg_ops.UnaryAggregateOp, ): - block = self._block - labels = [block.col_id_to_label[col] for col in self._value_column_ids] - block, result_ids = block.multi_apply_window_op( - self._value_column_ids, + agg_col_ids = [ + col_id + for col_id in self._value_column_ids + if col_id != self._skip_agg_column_id + ] + agg_block = self._aggregate_block(op, agg_col_ids) + + if self._skip_agg_column_id is not None: + # Concat the skipped column to the result. + agg_block, _ = agg_block.join( + self._block.select_column(self._skip_agg_column_id), how="outer" + ) + + if self._is_series: + from bigframes.series import Series + + return Series(agg_block) + else: + from bigframes.dataframe import DataFrame + + # Preserve column order. + column_labels = [ + self._block.col_id_to_label[col_id] for col_id in self._value_column_ids + ] + return DataFrame(agg_block)._reindex_columns(column_labels) + + def _aggregate_block( + self, op: agg_ops.UnaryAggregateOp, agg_col_ids: typing.List[str] + ) -> blocks.Block: + block, result_ids = self._block.multi_apply_window_op( + agg_col_ids, op, self._window_spec, skip_null_groups=self._drop_null_groups, @@ -85,13 +116,5 @@ def _apply_aggregate( ) block = block.set_index(col_ids=index_ids) - if self._is_series: - from bigframes.series import Series - - return Series(block.select_columns(result_ids).with_column_labels(labels)) - else: - from bigframes.dataframe import DataFrame - - return DataFrame( - block.select_columns(result_ids).with_column_labels(labels) - ) + labels = [self._block.col_id_to_label[col] for col in agg_col_ids] + return block.select_columns(result_ids).with_column_labels(labels) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index fa3b428471..bc56a53196 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -3318,10 +3318,15 @@ def rolling( window_def = windows.WindowSpec( bounds=windows.RowsWindowBounds.from_window_size(window, closed), min_periods=min_periods if min_periods is not None else window, - on=None if on is None else self._block.resolve_label_exact_or_error(on), + ) + skip_agg_col_id = ( + None if on is None else self._block.resolve_label_exact_or_error(on) ) return bigframes.core.window.Window( - self._block, window_def, self._block.value_columns + self._block, + window_def, + self._block.value_columns, + skip_agg_column_id=skip_agg_col_id, ) @validations.requires_ordering()