Skip to content

feat: Add quantile statistic #613

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 16, 2024
3 changes: 3 additions & 0 deletions bigframes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,6 @@
LEP_ENABLED_BIGQUERY_LOCATIONS = frozenset(
ALL_BIGQUERY_LOCATIONS - REP_ENABLED_BIGQUERY_LOCATIONS
)

# BigQuery default is 10000, leave 100 for overhead
MAX_COLUMNS = 9900
34 changes: 34 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import functools
import typing
from typing import Sequence

import pandas as pd

Expand Down Expand Up @@ -105,6 +106,39 @@ def indicate_duplicates(
)


def quantile(
block: blocks.Block,
columns: Sequence[str],
qs: Sequence[float],
grouping_column_ids: Sequence[str] = (),
) -> blocks.Block:
# TODO: handle windowing and more interpolation methods
window = core.WindowSpec(
grouping_keys=tuple(grouping_column_ids),
)
quantile_cols = []
labels = []
if len(columns) * len(qs) > constants.MAX_COLUMNS:
raise NotImplementedError("Too many aggregates requested.")
for col in columns:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throws an exception for dataframe if column_name is larger than 30?

if len(self.value_columns) > 30:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added limit

for q in qs:
label = block.col_id_to_label[col]
new_label = (*label, q) if isinstance(label, tuple) else (label, q)
labels.append(new_label)
block, quantile_col = block.apply_window_op(
col,
agg_ops.QuantileOp(q),
window_spec=window,
)
quantile_cols.append(quantile_col)
block, results = block.aggregate(
grouping_column_ids,
tuple((col, agg_ops.AnyValueOp()) for col in quantile_cols),
dropna=True,
)
return block.select_columns(results).with_column_labels(labels)


def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block:
supported_methods = [
"linear",
Expand Down
13 changes: 9 additions & 4 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,12 +1498,17 @@ def stack(self, how="left", levels: int = 1):

row_label_tuples = utils.index_as_tuples(row_labels)

if col_labels is not None:
if col_labels is None:
result_index: pd.Index = pd.Index([None])
result_col_labels: Sequence[Tuple] = list([()])
elif (col_labels.nlevels == 1) and all(
col_labels.isna()
): # isna not implemented for MultiIndex for newer pandas versions
result_index = pd.Index([None])
result_col_labels = utils.index_as_tuples(col_labels.drop_duplicates())
else:
result_index = col_labels.drop_duplicates().dropna(how="all")
result_col_labels = utils.index_as_tuples(result_index)
else:
result_index = pd.Index([None])
result_col_labels = list([()])

# Get matching columns
unpivot_columns: List[Tuple[str, List[str]]] = []
Expand Down
8 changes: 8 additions & 0 deletions bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def _(
return cast(ibis_types.NumericValue, value)


@compile_unary_agg.register
@numeric_op
def _(
op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None
) -> ibis_types.NumericValue:
return _apply_window_if_present(column.quantile(op.q), window)


@compile_unary_agg.register
@numeric_op
def _(
Expand Down
57 changes: 51 additions & 6 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import typing
from typing import Sequence, Union

import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby
import pandas as pd
Expand Down Expand Up @@ -115,14 +116,35 @@ def mean(self, numeric_only: bool = False, *args) -> df.DataFrame:
def median(
self, numeric_only: bool = False, *, exact: bool = False
) -> df.DataFrame:
if exact:
raise NotImplementedError(
f"Only approximate median is supported. {constants.FEEDBACK_LINK}"
)
if not numeric_only:
self._raise_on_non_numeric("median")
if exact:
return self.quantile(0.5)
return self._aggregate_all(agg_ops.median_op, numeric_only=True)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("quantile")
q_cols = tuple(
col
for col in self._selected_cols
if self._column_type(col) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE
)
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
self._block,
q_cols,
qs=tuple(q) if multi_q else (q,), # type: ignore
grouping_column_ids=self._by_col_ids,
)
result_df = df.DataFrame(result)
if multi_q:
return result_df.stack()
else:
return result_df.droplevel(-1, 1)

def min(self, numeric_only: bool = False, *args) -> df.DataFrame:
return self._aggregate_all(agg_ops.min_op, numeric_only=numeric_only)

Expand Down Expand Up @@ -466,8 +488,31 @@ def sum(self, *args) -> series.Series:
def mean(self, *args) -> series.Series:
return self._aggregate(agg_ops.mean_op)

def median(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.mean_op)
def median(
self,
*args,
exact: bool = False,
**kwargs,
) -> series.Series:
if exact:
return self.quantile(0.5)
else:
return self._aggregate(agg_ops.median_op)

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
) -> series.Series:
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
self._block,
(self._value_column,),
qs=tuple(q) if multi_q else (q,), # type: ignore
grouping_column_ids=self._by_col_ids,
)
if multi_q:
return series.Series(result.stack())
else:
return series.Series(result.stack()).droplevel(-1)

def std(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.std_op)
Expand Down
30 changes: 28 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2009,8 +2009,34 @@ def median(
frame = self._raise_on_non_numeric("median")
else:
frame = self._drop_non_numeric()
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))
if exact:
return self.quantile()
else:
block = frame._block.aggregate_all_and_stack(agg_ops.median_op)
return bigframes.series.Series(block.select_column("values"))

def quantile(
self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False
):
if not numeric_only:
frame = self._raise_on_non_numeric("median")
else:
frame = self._drop_non_numeric()
multi_q = utils.is_list_like(q)
result = block_ops.quantile(
frame._block, frame._block.value_columns, qs=tuple(q) if multi_q else (q,) # type: ignore
)
if multi_q:
return DataFrame(result.stack()).droplevel(0)
else:
result_df = (
DataFrame(result)
.stack(list(range(0, frame.columns.nlevels)))
.droplevel(0)
)
result_series = bigframes.series.Series(result_df._block)
result_series.name = q
return result_series

def std(
self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False
Expand Down
12 changes: 12 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT
return input_types[0]


@dataclasses.dataclass(frozen=True)
class QuantileOp(UnaryAggregateOp):
q: float

@property
def name(self):
return f"{int(self.q*100)}%"

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
return signatures.UNARY_REAL_NUMERIC.output_type(input_types[0])


@dataclasses.dataclass(frozen=True)
class ApproxQuartilesOp(UnaryAggregateOp):
quartile: int
Expand Down
19 changes: 14 additions & 5 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import os
import textwrap
import typing
from typing import Any, Literal, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union

import bigframes_vendored.pandas.core.series as vendored_pandas_series
import google.cloud.bigquery as bigquery
Expand Down Expand Up @@ -968,10 +968,19 @@ def mean(self) -> float:

def median(self, *, exact: bool = False) -> float:
if exact:
raise NotImplementedError(
f"Only approximate median is supported. {constants.FEEDBACK_LINK}"
)
return typing.cast(float, self._apply_aggregation(agg_ops.median_op))
return typing.cast(float, self.quantile(0.5))
else:
return typing.cast(float, self._apply_aggregation(agg_ops.median_op))

def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, float]:
qs = tuple(q) if utils.is_list_like(q) else (q,)
result = block_ops.quantile(self._block, (self._value_column,), qs=qs)
if utils.is_list_like(q):
result = result.stack()
result = result.drop_levels([result.index_columns[0]])
return Series(result)
else:
return cast(float, Series(result).to_pandas().squeeze())

def sum(self) -> float:
return typing.cast(float, self._apply_aggregation(agg_ops.sum_op))
Expand Down
30 changes: 29 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2504,7 +2504,10 @@ def test_df_melt_default(scalars_dfs):

# Pandas produces int64 index, Bigframes produces Int64 (nullable)
pd.testing.assert_frame_equal(
bf_result, pd_result, check_index_type=False, check_dtype=False
bf_result,
pd_result,
check_index_type=False,
check_dtype=False,
)


Expand Down Expand Up @@ -3029,6 +3032,31 @@ def test_dataframe_aggregates_median(scalars_df_index, scalars_pandas_df_index):
)


def test_dataframe_aggregates_quantile_mono(scalars_df_index, scalars_pandas_df_index):
q = 0.45
col_names = ["int64_too", "int64_col", "float64_col"]
bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas()
pd_result = scalars_pandas_df_index[col_names].quantile(q=q)

# Pandas may produce narrower numeric types, but bigframes always produces Float64
pd_result = pd_result.astype("Float64")

pd.testing.assert_series_equal(bf_result, pd_result, check_index_type=False)


def test_dataframe_aggregates_quantile_multi(scalars_df_index, scalars_pandas_df_index):
q = [0, 0.33, 0.67, 1.0]
col_names = ["int64_too", "int64_col", "float64_col"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe support the numeric_only parameter also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas()
pd_result = scalars_pandas_df_index[col_names].quantile(q=q)

# Pandas may produce narrower numeric types, but bigframes always produces Float64
pd_result = pd_result.astype("Float64")
pd_result.index = pd_result.index.astype("Float64")

pd.testing.assert_frame_equal(bf_result, pd_result)


@pytest.mark.parametrize(
("op"),
[
Expand Down
35 changes: 35 additions & 0 deletions tests/system/small/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ def test_dataframe_groupby_median(scalars_df_index, scalars_pandas_df_index):
assert ((pd_min <= bf_result_computed) & (bf_result_computed <= pd_max)).all().all()


@pytest.mark.parametrize(
("q"),
[
([0.2, 0.4, 0.6, 0.8]),
(0.11),
],
)
def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q):
col_names = ["int64_too", "float64_col", "int64_col", "string_col"]
bf_result = (
scalars_df_index[col_names].groupby("string_col").quantile(q)
).to_pandas()
pd_result = scalars_pandas_df_index[col_names].groupby("string_col").quantile(q)
pd.testing.assert_frame_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)


@pytest.mark.parametrize(
("operator"),
[
Expand Down Expand Up @@ -389,3 +407,20 @@ def test_dataframe_groupby_nonnumeric_with_mean():
pd.testing.assert_frame_equal(
pd_result, bf_result, check_index_type=False, check_dtype=False
)


@pytest.mark.parametrize(
("q"),
[
([0.2, 0.4, 0.6, 0.8]),
(0.11),
],
)
def test_series_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q):
bf_result = (
scalars_df_index.groupby("string_col")["int64_col"].quantile(q)
).to_pandas()
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].quantile(q)
pd.testing.assert_series_equal(
pd_result, bf_result, check_dtype=False, check_index_type=False
)
21 changes: 21 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,27 @@ def test_median(scalars_dfs):
assert pd_min < bf_result < pd_max


def test_median_exact(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col_name = "int64_col"
bf_result = scalars_df[col_name].median(exact=True)
pd_result = scalars_pandas_df[col_name].median()
assert math.isclose(pd_result, bf_result)


def test_series_quantile(scalars_dfs):
scalars_df, scalars_pandas_df = scalars_dfs
col_name = "int64_col"
bf_series = scalars_df[col_name]
pd_series = scalars_pandas_df[col_name]

pd_result = pd_series.quantile([0.0, 0.4, 0.6, 1.0])
bf_result = bf_series.quantile([0.0, 0.4, 0.6, 1.0])
pd.testing.assert_series_equal(
pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False
)


def test_numeric_literal(scalars_dfs):
scalars_df, _ = scalars_dfs
col_name = "numeric_col"
Expand Down
Loading