Skip to content

Commit faac725

Browse files
Merge pull request apache#15165 from [BEAM-12593] Verify DataFrame API on pandas 1.3.0
* [BEAM-12593] Verify DataFrame API on pandas 1.3.0 * Fills correct partitioning for fillna * Add version number to lib * Condition on incompatible version features * Due to BEAM-12495 value_counts with dropna=False does not work on frames * Added NotImplementedError when detecting BEAM-12495 * fix insert/value_counts doc test * update Pandas doctests for groupby for 1.3.x Pandas * test dropna&normalize combinations Co-authored-by: Brian Hulette <bhulette@google.com>
1 parent a14f7e8 commit faac725

File tree

4 files changed

+105
-22
lines changed

4 files changed

+105
-22
lines changed

sdks/python/apache_beam/dataframe/frames.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@
5555
'DeferredDataFrame',
5656
]
5757

58+
# Get major, minor version
59+
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
60+
5861

5962
def populate_not_implemented(pd_type):
6063
def wrapper(deferred_type):
@@ -1932,7 +1935,7 @@ def value_counts(
19321935
else:
19331936
column = self
19341937

1935-
result = column.groupby(column).size()
1938+
result = column.groupby(column, dropna=dropna).size()
19361939

19371940
# groupby.size() names the index, which we don't need
19381941
result.index.name = None
@@ -2392,8 +2395,8 @@ def aggregate(self, func, axis, *args, **kwargs):
23922395
if func in ('quantile',):
23932396
return getattr(self, func)(*args, axis=axis, **kwargs)
23942397

2395-
# Maps to a property, args are ignored
2396-
if func in ('size',):
2398+
# In pandas<1.3.0, maps to a property, args are ignored
2399+
if func in ('size',) and PD_VERSION < (1, 3):
23972400
return getattr(self, func)
23982401

23992402
# We also have specialized distributed implementations for these. They only
@@ -3392,7 +3395,7 @@ def melt(self, ignore_index, **kwargs):
33923395

33933396
@frame_base.with_docs_from(pd.DataFrame)
33943397
def value_counts(self, subset=None, sort=False, normalize=False,
3395-
ascending=False):
3398+
ascending=False, dropna=True):
33963399
"""``sort`` is ``False`` by default, and ``sort=True`` is not supported
33973400
because it imposes an ordering on the dataset which likely will not be
33983401
preserved."""
@@ -3403,10 +3406,16 @@ def value_counts(self, subset=None, sort=False, normalize=False,
34033406
"ordering on the dataset which likely will not be preserved.",
34043407
reason="order-sensitive")
34053408
columns = subset or list(self.columns)
3406-
result = self.groupby(columns).size()
3409+
3410+
if dropna:
3411+
dropped = self.dropna()
3412+
else:
3413+
dropped = self
3414+
3415+
result = dropped.groupby(columns, dropna=dropna).size()
34073416

34083417
if normalize:
3409-
return result/self.dropna().length()
3418+
return result/dropped.length()
34103419
else:
34113420
return result
34123421

sdks/python/apache_beam/dataframe/frames_test.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
from apache_beam.dataframe import frame_base
2626
from apache_beam.dataframe import frames
2727

28-
PD_VERSION = tuple(map(int, pd.__version__.split('.')))
28+
# Get major, minor version
29+
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
2930

3031
GROUPBY_DF = pd.DataFrame({
3132
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
@@ -235,6 +236,17 @@ def test_dataframe_arithmetic(self):
235236
self._run_test(
236237
lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)
237238

239+
@unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")
240+
def test_value_counts_dropna_false(self):
241+
df = pd.DataFrame({
242+
'first_name': ['John', 'Anne', 'John', 'Beth'],
243+
'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']
244+
})
245+
# TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in
246+
# https://github.com/pandas-dev/pandas/issues/36470 is fixed.
247+
with self.assertRaises(NotImplementedError):
248+
self._run_test(lambda df: df.value_counts(dropna=False), df)
249+
238250
def test_get_column(self):
239251
df = pd.DataFrame({
240252
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
@@ -369,10 +381,15 @@ def test_combine_dataframe_fill(self):
369381
nonparallel=True)
370382

371383
def test_combine_Series(self):
372-
with expressions.allow_non_parallel_operations():
373-
s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
374-
s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
375-
self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2)
384+
s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
385+
s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
386+
self._run_test(
387+
lambda s1,
388+
s2: s1.combine(s2, max),
389+
s1,
390+
s2,
391+
nonparallel=True,
392+
check_proxy=False)
376393

377394
def test_combine_first_dataframe(self):
378395
df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
@@ -587,8 +604,27 @@ def test_value_counts_with_nans(self):
587604
self._run_test(lambda df: df.value_counts(), df)
588605
self._run_test(lambda df: df.value_counts(normalize=True), df)
589606

607+
if PD_VERSION >= (1, 3):
608+
# dropna=False is new in pandas 1.3
609+
# TODO(BEAM-12495): Remove the assertRaises this when the underlying bug
610+
# in https://github.com/pandas-dev/pandas/issues/36470 is fixed.
611+
with self.assertRaises(NotImplementedError):
612+
self._run_test(lambda df: df.value_counts(dropna=False), df)
613+
614+
# Test the defaults.
590615
self._run_test(lambda df: df.num_wings.value_counts(), df)
591616
self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)
617+
self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)
618+
619+
# Test the combination interactions.
620+
for normalize in (True, False):
621+
for dropna in (True, False):
622+
self._run_test(
623+
lambda df,
624+
dropna=dropna,
625+
normalize=normalize: df.num_wings.value_counts(
626+
dropna=dropna, normalize=normalize),
627+
df)
592628

593629
def test_value_counts_does_not_support_sort(self):
594630
df = pd.DataFrame({

sdks/python/apache_beam/dataframe/pandas_doctests_test.py

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import pandas as pd
2121

2222
from apache_beam.dataframe import doctests
23+
from apache_beam.dataframe.frames import PD_VERSION
2324
from apache_beam.dataframe.pandas_top_level_functions import _is_top_level_function
2425

2526

@@ -68,7 +69,8 @@ def test_ndframe_tests(self):
6869
"df.replace(regex={r'^ba.$': 'new', 'foo': 'xyz'})"
6970
],
7071
'pandas.core.generic.NDFrame.fillna': [
71-
"df.fillna(method='ffill')",
72+
'df.fillna(method=\'ffill\')',
73+
'df.fillna(method="ffill")',
7274
'df.fillna(value=values, limit=1)',
7375
],
7476
'pandas.core.generic.NDFrame.sort_values': ['*'],
@@ -164,7 +166,8 @@ def test_dataframe_tests(self):
164166
'pandas.core.frame.DataFrame.cumprod': ['*'],
165167
'pandas.core.frame.DataFrame.diff': ['*'],
166168
'pandas.core.frame.DataFrame.fillna': [
167-
"df.fillna(method='ffill')",
169+
'df.fillna(method=\'ffill\')',
170+
'df.fillna(method="ffill")',
168171
'df.fillna(value=values, limit=1)',
169172
],
170173
'pandas.core.frame.DataFrame.items': ['*'],
@@ -237,13 +240,17 @@ def test_dataframe_tests(self):
237240
# reindex not supported
238241
's2 = s.reindex([1, 0, 2, 3])',
239242
],
243+
'pandas.core.frame.DataFrame.resample': ['*'],
244+
'pandas.core.frame.DataFrame.values': ['*'],
240245
},
241246
not_implemented_ok={
242247
'pandas.core.frame.DataFrame.transform': [
243248
# str arg not supported. Tested with np.sum in
244249
# frames_test.py::DeferredFrameTest::test_groupby_transform_sum
245250
"df.groupby('Date')['Data'].transform('sum')",
246251
],
252+
'pandas.core.frame.DataFrame.swaplevel': ['*'],
253+
'pandas.core.frame.DataFrame.melt': ['*'],
247254
'pandas.core.frame.DataFrame.reindex_axis': ['*'],
248255
'pandas.core.frame.DataFrame.round': [
249256
'df.round(decimals)',
@@ -267,13 +274,20 @@ def test_dataframe_tests(self):
267274
'pandas.core.frame.DataFrame.set_index': [
268275
"df.set_index([s, s**2])",
269276
],
277+
278+
# TODO(BEAM-12495)
279+
'pandas.core.frame.DataFrame.value_counts': [
280+
'df.value_counts(dropna=False)'
281+
],
270282
},
271283
skip={
272284
# s2 created with reindex
273285
'pandas.core.frame.DataFrame.dot': [
274286
'df.dot(s2)',
275287
],
276288

289+
'pandas.core.frame.DataFrame.resample': ['df'],
290+
'pandas.core.frame.DataFrame.asfreq': ['*'],
277291
# Throws NotImplementedError when modifying df
278292
'pandas.core.frame.DataFrame.axes': [
279293
# Returns deferred index.
@@ -302,6 +316,14 @@ def test_dataframe_tests(self):
302316
'pandas.core.frame.DataFrame.to_markdown': ['*'],
303317
'pandas.core.frame.DataFrame.to_parquet': ['*'],
304318

319+
# Raises right exception, but testing framework has matching issues.
320+
# Tested in `frames_test.py`.
321+
'pandas.core.frame.DataFrame.insert': [
322+
'df',
323+
'df.insert(1, "newcol", [99, 99])',
324+
'df.insert(0, "col1", [100, 100], allow_duplicates=True)'
325+
],
326+
305327
'pandas.core.frame.DataFrame.to_records': [
306328
'df.index = df.index.rename("I")',
307329
'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x
@@ -385,7 +407,8 @@ def test_series_tests(self):
385407
's.dot(arr)', # non-deferred result
386408
],
387409
'pandas.core.series.Series.fillna': [
388-
"df.fillna(method='ffill')",
410+
'df.fillna(method=\'ffill\')',
411+
'df.fillna(method="ffill")',
389412
'df.fillna(value=values, limit=1)',
390413
],
391414
'pandas.core.series.Series.items': ['*'],
@@ -434,11 +457,11 @@ def test_series_tests(self):
434457
's.drop_duplicates()',
435458
"s.drop_duplicates(keep='last')",
436459
],
437-
'pandas.core.series.Series.repeat': [
438-
's.repeat([1, 2, 3])'
439-
],
440460
'pandas.core.series.Series.reindex': ['*'],
441461
'pandas.core.series.Series.autocorr': ['*'],
462+
'pandas.core.series.Series.repeat': ['s.repeat([1, 2, 3])'],
463+
'pandas.core.series.Series.resample': ['*'],
464+
'pandas.core.series.Series': ['ser.iloc[0] = 999'],
442465
},
443466
not_implemented_ok={
444467
'pandas.core.series.Series.transform': [
@@ -451,15 +474,19 @@ def test_series_tests(self):
451474
'ser.groupby(["a", "b", "a", np.nan]).mean()',
452475
'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()',
453476
],
477+
'pandas.core.series.Series.swaplevel' :['*']
454478
},
455479
skip={
480+
# Relies on setting values with iloc
481+
'pandas.core.series.Series': ['ser', 'r'],
456482
'pandas.core.series.Series.groupby': [
457483
# TODO(BEAM-11393): This example requires aligning two series
458484
# with non-unique indexes. It only works in pandas because
459485
# pandas can recognize the indexes are identical and elide the
460486
# alignment.
461487
'ser.groupby(ser > 100).mean()',
462488
],
489+
'pandas.core.series.Series.asfreq': ['*'],
463490
# error formatting
464491
'pandas.core.series.Series.append': [
465492
's1.append(s2, verify_integrity=True)',
@@ -491,12 +518,12 @@ def test_series_tests(self):
491518
# Inspection after modification.
492519
's'
493520
],
521+
'pandas.core.series.Series.resample': ['df'],
494522
})
495523
self.assertEqual(result.failed, 0)
496524

497525
def test_string_tests(self):
498-
PD_VERSION = tuple(int(v) for v in pd.__version__.split('.'))
499-
if PD_VERSION < (1, 2, 0):
526+
if PD_VERSION < (1, 2):
500527
module = pd.core.strings
501528
else:
502529
# Definitions were moved to accessor in pandas 1.2.0
@@ -668,11 +695,13 @@ def test_groupby_tests(self):
668695
'pandas.core.groupby.generic.SeriesGroupBy.diff': ['*'],
669696
'pandas.core.groupby.generic.DataFrameGroupBy.hist': ['*'],
670697
'pandas.core.groupby.generic.DataFrameGroupBy.fillna': [
671-
"df.fillna(method='ffill')",
698+
'df.fillna(method=\'ffill\')',
699+
'df.fillna(method="ffill")',
672700
'df.fillna(value=values, limit=1)',
673701
],
674702
'pandas.core.groupby.generic.SeriesGroupBy.fillna': [
675-
"df.fillna(method='ffill')",
703+
'df.fillna(method=\'ffill\')',
704+
'df.fillna(method="ffill")',
676705
'df.fillna(value=values, limit=1)',
677706
],
678707
},
@@ -682,6 +711,7 @@ def test_groupby_tests(self):
682711
'pandas.core.groupby.generic.SeriesGroupBy.transform': ['*'],
683712
'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['*'],
684713
'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['*'],
714+
'pandas.core.groupby.generic.SeriesGroupBy.apply': ['*'],
685715
},
686716
skip={
687717
'pandas.core.groupby.generic.SeriesGroupBy.cov': [
@@ -698,6 +728,14 @@ def test_groupby_tests(self):
698728
# These examples rely on grouping by a list
699729
'pandas.core.groupby.generic.SeriesGroupBy.aggregate': ['*'],
700730
'pandas.core.groupby.generic.DataFrameGroupBy.aggregate': ['*'],
731+
'pandas.core.groupby.generic.SeriesGroupBy.transform': [
732+
# Dropping invalid columns during a transform is unsupported.
733+
'grouped.transform(lambda x: (x - x.mean()) / x.std())'
734+
],
735+
'pandas.core.groupby.generic.DataFrameGroupBy.transform': [
736+
# Dropping invalid columns during a transform is unsupported.
737+
'grouped.transform(lambda x: (x - x.mean()) / x.std())'
738+
],
701739
})
702740
self.assertEqual(result.failed, 0)
703741

sdks/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def get_version():
168168
'mock>=1.0.1,<3.0.0',
169169
'nose>=1.3.7',
170170
'nose_xunitmp>=0.4.1',
171-
'pandas>=1.0,<1.3.0',
171+
'pandas>=1.0,<1.4.0',
172172
'parameterized>=0.7.1,<0.8.0',
173173
'pyhamcrest>=1.9,!=1.10.0,<2.0.0',
174174
'pyyaml>=3.12,<6.0.0',

0 commit comments

Comments
 (0)