From 88fd3b461f447e8a778d7cf6dd8c5b7587824e14 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 31 Jul 2024 23:30:46 +0000 Subject: [PATCH 1/8] chore: apply `remote_function` on the original series This change tests application of remote function without reprojecting the original series. --- bigframes/series.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bigframes/series.py b/bigframes/series.py index 1a5661529c..b3260da8c9 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1475,8 +1475,7 @@ def apply( raise # We are working with remote function at this point - reprojected_series = Series(self._block._force_reproject()) - result_series = reprojected_series._apply_unary_op( + result_series = self._apply_unary_op( ops.RemoteFunctionOp(func=func, apply_on_null=True) ) From 828b67a90128169be85ef519a57cf924b4a1193d Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Sat, 31 Aug 2024 02:20:49 +0000 Subject: [PATCH 2/8] add failing mask doctest as a system test for easier debugging --- tests/system/large/test_remote_function.py | 25 ++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d6eefc1e31..da306f8a81 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -718,6 +718,31 @@ def inner_test(): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_udf_mask_repr(session, dataset_id, bq_cf_connection): + try: + + @session.remote_function( + dataset=dataset_id, + reuse=False, + ) + def should_mask(name: str) -> bool: + hash = 0 + for char_ in name: + hash += ord(char_) + return hash % 2 == 0 + + s = bpd.Series(["Alice", "Bob", "Caroline"]) + repr(s.mask(should_mask)) + repr(s.mask(should_mask, "REDACTED")) + + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, should_mask + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( session, scalars_dfs, dataset_id, bq_cf_connection From a16e909c1650ac2200f35c8ee512b10a3a01b7e8 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 3 Sep 2024 19:24:30 +0000 Subject: [PATCH 3/8] more comprehensive repr tests --- tests/system/large/test_remote_function.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index da306f8a81..59cfd37074 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -719,7 +719,7 @@ def inner_test(): @pytest.mark.flaky(retries=2, delay=120) -def test_remote_udf_mask_repr(session, dataset_id, bq_cf_connection): +def test_remote_udf_application_repr(session, dataset_id): try: @session.remote_function( @@ -733,6 +733,10 @@ def should_mask(name: str) -> bool: return hash % 2 == 0 s = bpd.Series(["Alice", "Bob", "Caroline"]) + + repr(s.apply(should_mask)) + repr(s.where(s.apply(should_mask))) + repr(s.where(~s.apply(should_mask))) repr(s.mask(should_mask)) repr(s.mask(should_mask, "REDACTED")) From 8eda74c897beec63893b26daa178ef58e2f67649 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 10 Sep 2024 07:01:15 +0000 Subject: [PATCH 4/8] more tests, move to small tests --- tests/system/large/test_remote_function.py | 29 ------ tests/system/small/test_remote_function.py | 110 +++++++++++++++++++++ 2 files changed, 110 insertions(+), 29 deletions(-) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index 3e989ed353..77ea4627ec 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -718,35 +718,6 @@ def inner_test(): ) -@pytest.mark.flaky(retries=2, delay=120) -def test_remote_udf_application_repr(session, dataset_id): - try: - - @session.remote_function( - dataset=dataset_id, - reuse=False, - ) - def should_mask(name: str) -> bool: - hash = 0 - for char_ in name: - hash += ord(char_) - return hash % 2 == 0 - - s = bpd.Series(["Alice", "Bob", "Caroline"]) - - repr(s.apply(should_mask)) - repr(s.where(s.apply(should_mask))) - repr(s.where(~s.apply(should_mask))) - repr(s.mask(should_mask)) - repr(s.mask(should_mask, "REDACTED")) - - finally: - # clean up the gcp assets created for the remote function - cleanup_remote_function_assets( - session.bqclient, session.cloudfunctionsclient, should_mask - ) - - @pytest.mark.flaky(retries=2, delay=120) def test_remote_udf_mask_default_value( session, scalars_dfs, dataset_id, bq_cf_connection diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index b000354ed4..5ffda56f92 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import re import google.api_core.exceptions @@ -972,3 +973,112 @@ def echo_len(row): bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview." ): scalars_df[[column]].apply(echo_len_remote, axis=1) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_application_repr(session, dataset_id_permanent): + # This function deliberately has a param with name "name", this is to test + # a specific ibis' internal handling of object names + def should_mask(name: str) -> bool: + hash = 0 + for char_ in name: + hash += ord(char_) + return hash % 2 == 0 + + assert "name" in inspect.signature(should_mask).parameters + + should_mask = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(should_mask) + )(should_mask) + + s = bigframes.series.Series(["Alice", "Bob", "Caroline"]) + + repr(s.apply(should_mask)) + repr(s.where(s.apply(should_mask))) + repr(s.where(~s.apply(should_mask))) + repr(s.mask(should_mask)) + repr(s.mask(should_mask, "REDACTED")) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_index): + gbq_function = f"{dataset_id}.should_mask" + + # This function deliberately has a param with name "name", this is to test + # a specific ibis' internal handling of object names + session.bqclient.query_and_wait( + f"CREATE OR REPLACE FUNCTION `{gbq_function}`(name STRING) RETURNS BOOL AS (MOD(LENGTH(name), 2) = 1)" + ) + routine = session.bqclient.get_routine(gbq_function) + assert "name" in [arg.name for arg in routine.arguments] + + # read the function and apply to dataframe + should_mask = session.read_gbq_function(gbq_function) + + s = scalars_df_index["string_col"] + + repr(s.apply(should_mask)) + repr(s.where(s.apply(should_mask))) + repr(s.where(~s.apply(should_mask))) + repr(s.mask(should_mask)) + repr(s.mask(should_mask, "REDACTED")) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs): + + # This function is deliberately written to not work with NA input + def plus_one(x: int) -> int: + return x + 1 + + scalars_df, scalars_pandas_df = scalars_dfs + int_col_name_with_nulls = "int64_col" + + # make sure there are NA values in the test column + assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]]) + + # create a remote function + plus_one_remote = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(plus_one) + )(plus_one) + + # with nulls in the series the remote function application would fail + with pytest.raises( + google.api_core.exceptions.BadRequest, match="unsupported operand" + ): + scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas() + + # after filtering out nulls the remote function application should works + # similar to pandas + pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ].apply(plus_one) + bf_result = ( + scalars_df[scalars_df[int_col_name_with_nulls].notnull()][ + int_col_name_with_nulls + ] + .apply(plus_one_remote) + .to_pandas() + ) + + # ignore pandas "int64" vs bigframes "Int64" dtype difference + pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent): + session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial")) + + df = session.read_gbq("bigquery-public-data.baseball.schedules")[ + ["duration_minutes"] + ] + + def plus_one(x: int) -> int: + return x + 1 + + plus_one = session.remote_function( + dataset=dataset_id_permanent, name=get_rf_name(plus_one) + )(plus_one) + + df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one)) + repr(df1) From db731c238c477d3df7452cb7fcc65435292fa444 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 10 Sep 2024 19:47:27 +0000 Subject: [PATCH 5/8] rename "name" param --- bigframes/functions/_remote_function_session.py | 14 ++++++++++++++ bigframes/functions/remote_function.py | 7 ++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index c69e430836..a29e03966a 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -499,6 +499,20 @@ def try_delattr(attr): cloud_function_memory_mib=cloud_function_memory_mib, ) + # TODO(shobs): Find a better way to support udfs with param named "name". + # This causes an issue in the ibis compilation. + func.__signature__ = inspect.signature(func).replace( # type: ignore + parameters=[ + inspect.Parameter( + f"bigframes_{param.name}" + if param.name == "name" + else param.name, + param.kind, + ) + for param in inspect.signature(func).parameters.values() + ] + ) + # TODO: Move ibis logic to compiler step node = ibis.udf.scalar.builtin( func, diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 39e3bfd8f0..4fbfe39472 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -153,7 +153,12 @@ def func(*ignored_args, **ignored_kwargs): func.__signature__ = inspect.signature(func).replace( # type: ignore parameters=[ - inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD) + # TODO(shobs): Find a better way to support functions with param + # named "name". This causes an issue in the ibis compilation. + inspect.Parameter( + f"bigframes_{name}" if name == "name" else name, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) for name in ibis_signature.parameter_names ] ) From 291e1cb7a7936aba91eb0607c0771aa1d1dbcca4 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 11 Sep 2024 00:03:08 +0000 Subject: [PATCH 6/8] manipulate copy of the original udf --- bigframes/functions/_remote_function_session.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 37cbda6195..9949e93cff 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -22,6 +22,7 @@ from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union import warnings +import cloudpickle import google.api_core.exceptions from google.cloud import ( bigquery, @@ -381,6 +382,11 @@ def wrapper(func): if not callable(func): raise TypeError("f must be callable, got {}".format(func)) + # To respect the user code/environment let's use a copy of the + # original udf, especially since we would be setting some properties + # on it + func = cloudpickle.loads(cloudpickle.dumps(func)) + if sys.version_info >= (3, 10): # Add `eval_str = True` so that deferred annotations are turned into their # corresponding type objects. Need Python 3.10 for eval_str parameter. From 0d6895612732f6e7aef9d510600a6e26131a6d45 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 11 Sep 2024 01:48:28 +0000 Subject: [PATCH 7/8] move the funciton copy after i/o types resolution --- bigframes/functions/_remote_function_session.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 9949e93cff..07e5acba0c 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -382,11 +382,6 @@ def wrapper(func): if not callable(func): raise TypeError("f must be callable, got {}".format(func)) - # To respect the user code/environment let's use a copy of the - # original udf, especially since we would be setting some properties - # on it - func = cloudpickle.loads(cloudpickle.dumps(func)) - if sys.version_info >= (3, 10): # Add `eval_str = True` so that deferred annotations are turned into their # corresponding type objects. Need Python 3.10 for eval_str parameter. @@ -464,6 +459,11 @@ def wrapper(func): session=session, # type: ignore ) + # To respect the user code/environment let's use a copy of the + # original udf, especially since we would be setting some properties + # on it + func = cloudpickle.loads(cloudpickle.dumps(func)) + # In the unlikely case where the user is trying to re-deploy the same # function, cleanup the attributes we add below, first. This prevents # the pickle from having dependencies that might not otherwise be From cbbed9bb16503a5b75ac674320f735d10ca7d6a1 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 11 Sep 2024 21:14:16 +0000 Subject: [PATCH 8/8] rename all params to avoid collisions, widely use bigframes_ prefix for consistency --- bigframes/functions/_remote_function_session.py | 4 +--- bigframes/functions/remote_function.py | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 07e5acba0c..c947fcdc63 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -510,9 +510,7 @@ def try_delattr(attr): func.__signature__ = inspect.signature(func).replace( # type: ignore parameters=[ inspect.Parameter( - f"bigframes_{param.name}" - if param.name == "name" - else param.name, + f"bigframes_{param.name}", param.kind, ) for param in inspect.signature(func).parameters.values() diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 4fbfe39472..b4c74e90d6 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -144,11 +144,11 @@ def read_gbq_function( # The name "args" conflicts with the Ibis operator, so we use # non-standard names for the arguments here. - def func(*ignored_args, **ignored_kwargs): + def func(*bigframes_args, **bigframes_kwargs): f"""Remote function {str(routine_ref)}.""" nonlocal node # type: ignore - expr = node(*ignored_args, **ignored_kwargs) # type: ignore + expr = node(*bigframes_args, **bigframes_kwargs) # type: ignore return ibis_client.execute(expr) func.__signature__ = inspect.signature(func).replace( # type: ignore @@ -156,7 +156,7 @@ def func(*ignored_args, **ignored_kwargs): # TODO(shobs): Find a better way to support functions with param # named "name". This causes an issue in the ibis compilation. inspect.Parameter( - f"bigframes_{name}" if name == "name" else name, + f"bigframes_{name}", inspect.Parameter.POSITIONAL_OR_KEYWORD, ) for name in ibis_signature.parameter_names