From 198b40551655d94fe75782edb71956154f16200e Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 03:25:36 +0000 Subject: [PATCH 1/8] feat: upgrade BQ managed `udf` to preview --- bigframes/_config/experiment_options.py | 14 ---- bigframes/functions/_function_client.py | 73 ++++++++++++++----- bigframes/functions/_function_session.py | 14 ---- bigframes/functions/_utils.py | 7 +- noxfile.py | 8 +- .../large/functions/test_managed_function.py | 52 +++---------- 6 files changed, 71 insertions(+), 97 deletions(-) diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py index 3d52976004..24689d0c19 100644 --- a/bigframes/_config/experiment_options.py +++ b/bigframes/_config/experiment_options.py @@ -54,17 +54,3 @@ def blob(self, value: bool): ) warnings.warn(msg, category=bfe.PreviewWarning) self._blob = value - - @property - def udf(self) -> bool: - return self._udf - - @udf.setter - def udf(self, value: bool): - if value is True: - msg = bfe.format_message( - "BigFrames managed function (udf) is still under experiments. " - "It may not work and subject to change in the future." - ) - warnings.warn(msg, category=bfe.PreviewWarning) - self._udf = value diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 37b435eeec..c28af02398 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -53,6 +53,9 @@ } ) +# BQ managed functions (@udf) currently only support Python 3.11. +_MANAGED_FUNC_PYTHON_VERSION = "python-3.11" + class FunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation. @@ -193,11 +196,20 @@ def provision_bq_managed_function( name, packages, is_row_processor, + *, + capture_references=False, ): """Create a BigQuery managed function.""" - import cloudpickle - pickled = cloudpickle.dumps(func) + if capture_references: + # Check the Python version. + python_version = _utils.get_python_version() + if python_version != _MANAGED_FUNC_PYTHON_VERSION: + raise bf_formatting.create_exception_with_feedback_link( + RuntimeError, + f"Python version {python_version} is not supported yet for " + "BigFrames managed function.", + ) # Create BQ managed function. bq_function_args = [] @@ -209,13 +221,15 @@ def provision_bq_managed_function( bq_function_args.append(f"{name_} {type_}") managed_function_options = { - "runtime_version": _utils.get_python_version(), + "runtime_version": _MANAGED_FUNC_PYTHON_VERSION, "entry_point": "bigframes_handler", } # Augment user package requirements with any internal package # requirements. - packages = _utils._get_updated_package_requirements(packages, is_row_processor) + packages = _utils._get_updated_package_requirements( + packages, is_row_processor, capture_references + ) if packages: managed_function_options["packages"] = packages managed_function_options_str = self._format_function_options( @@ -235,20 +249,45 @@ def provision_bq_managed_function( persistent_func_id = ( f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}" ) - create_function_ddl = textwrap.dedent( - f""" - CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)}) - RETURNS {bq_function_return_type} - LANGUAGE python - OPTIONS ({managed_function_options_str}) - AS r''' + + udf_name = func.__name__ + if capture_references: + # This code path ensures that if the udf body contains any + # references to variables and/or imports outside the body, they are + # captured as well. import cloudpickle - udf = cloudpickle.loads({pickled}) - def bigframes_handler(*args): - return udf(*args) - ''' - """ - ).strip() + + pickled = cloudpickle.dumps(func) + udf_code = textwrap.dedent( + f""" + import cloudpickle + {udf_name} = cloudpickle.loads({pickled}) + """ + ) + else: + # This code path ensures that if the udf body is self contained, + # i.e. there are no references to variables or imports outside the + # body. + udf_code = textwrap.dedent(inspect.getsource(func)) + udf_code = udf_code[udf_code.index("def") :] + + create_function_ddl = ( + textwrap.dedent( + f""" + CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)}) + RETURNS {bq_function_return_type} + LANGUAGE python + OPTIONS ({managed_function_options_str}) + AS r''' + __UDF_PLACE_HOLDER__ + def bigframes_handler(*args): + return {udf_name}(*args) + ''' + """ + ) + .strip() + .replace("__UDF_PLACE_HOLDER__", udf_code) + ) self._ensure_dataset_exists() self._create_bq_function(create_function_ddl) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 1444457c90..2f494c5ca6 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -58,9 +58,6 @@ from . import _function_client, _utils -# BQ managed functions (@udf) currently only support Python 3.11. -_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",) - class FunctionSession: """Session to manage bigframes functions.""" @@ -809,17 +806,6 @@ def udf( of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. """ - if not bigframes.options.experiments.udf: - raise bf_formatting.create_exception_with_feedback_link(NotImplementedError) - - # Check the Python version. - python_version = _utils.get_python_version() - if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS: - raise bf_formatting.create_exception_with_feedback_link( - RuntimeError, - f"Python version {python_version} is not supported yet for " - "BigFrames managed function.", - ) # Some defaults may be used from the session if not provided otherwise. session = self._resolve_session(session) diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 9247017380..30d8bdcb79 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -64,9 +64,12 @@ def get_remote_function_locations(bq_location): def _get_updated_package_requirements( - package_requirements=None, is_row_processor=False + package_requirements=None, is_row_processor=False, capture_references=False ): - requirements = [f"cloudpickle=={cloudpickle.__version__}"] + requirements = [] + if capture_references: + requirements.append(f"cloudpickle=={cloudpickle.__version__}") + if is_row_processor: # bigframes function will send an entire row of data as json, which # would be converted to a pandas series and processed Ensure numpy diff --git a/noxfile.py b/noxfile.py index 77b32ab15d..1f604adcbf 100644 --- a/noxfile.py +++ b/noxfile.py @@ -59,12 +59,6 @@ DEFAULT_PYTHON_VERSION = "3.10" -# Cloud Run Functions supports Python versions up to 3.12 -# https://cloud.google.com/run/docs/runtimes/python -# Managed Python UDF is supported only in Python 3.11 -# Let's set the E2E tests version to 3.11 to cover most code paths. -E2E_TEST_PYTHON_VERSION = "3.11" - UNIT_TEST_PYTHON_VERSIONS = ["3.9", "3.10", "3.11", "3.12", "3.13"] UNIT_TEST_STANDARD_DEPENDENCIES = [ "mock", @@ -424,7 +418,7 @@ def doctest(session: nox.sessions.Session): ) -@nox.session(python=E2E_TEST_PYTHON_VERSION) +@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS[-1]) def e2e(session: nox.sessions.Session): """Run the large tests in system test suite.""" run_system( diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 7c8c74e005..92ffae4068 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -21,8 +21,6 @@ import bigframes.pandas as bpd from tests.system.utils import cleanup_function_assets -bpd.options.experiments.udf = True - def test_managed_function_multiply_with_ibis( session, @@ -121,21 +119,12 @@ def stringify(x): cleanup_function_assets(stringify, bigquery_client, ignore_failures=False) -@pytest.mark.parametrize( - "array_dtype", - [ - bool, - int, - float, - str, - ], -) -def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): +def test_managed_function_array_output(session, scalars_dfs, dataset_id): try: @session.udf(dataset=dataset_id) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + def featurize(x: int) -> list[float]: + return [float(i) for i in [x, x + 1, x + 2]] scalars_df, scalars_pandas_df = scalars_dfs @@ -160,7 +149,7 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore # Test on the function from read_gbq_function. got = featurize_ref(10) - assert got == [array_dtype(i) for i in [10, 11, 12]] + assert got == [10.0, 11.0, 12.0] bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas() pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) @@ -170,30 +159,18 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore cleanup_function_assets(featurize, session.bqclient, ignore_failures=False) -@pytest.mark.parametrize( - ("typ",), - [ - pytest.param(int), - pytest.param(float), - pytest.param(bool), - pytest.param(str), - pytest.param(bytes), - ], -) def test_managed_function_series_apply( session, - typ, scalars_dfs, ): try: @session.udf() - def foo(x: int) -> typ: # type:ignore - # The bytes() constructor expects a non-negative interger as its arg. - return typ(abs(x)) + def foo(x: int) -> bytes: + return bytes(abs(x)) # Function should still work normally. - assert foo(-2) == typ(2) + assert foo(-2) == bytes(2) assert hasattr(foo, "bigframes_bigquery_function") assert hasattr(foo, "ibis_node") @@ -237,26 +214,15 @@ def foo(x: int) -> typ: # type:ignore cleanup_function_assets(foo, session.bqclient, ignore_failures=False) -@pytest.mark.parametrize( - ("typ",), - [ - pytest.param(int), - pytest.param(float), - pytest.param(bool), - pytest.param(str), - ], -) def test_managed_function_series_apply_array_output( session, - typ, scalars_dfs, ): try: @session.udf() - def foo_list(x: int) -> list[typ]: # type:ignore - # The bytes() constructor expects a non-negative interger as its arg. - return [typ(abs(x)), typ(abs(x) + 1)] + def foo_list(x: int) -> list[float]: + return [float(abs(x)), float(abs(x) + 1)] scalars_df, scalars_pandas_df = scalars_dfs From c7f0b3d2902809ea9315221a25bcc1c14e184e90 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 03:36:13 +0000 Subject: [PATCH 2/8] remove `_udf` footprint, add preview warning --- bigframes/_config/experiment_options.py | 1 - bigframes/functions/_function_session.py | 2 ++ .../large/functions/test_managed_function.py | 9 ++++++--- tests/unit/_config/test_experiment_options.py | 15 --------------- 4 files changed, 8 insertions(+), 19 deletions(-) diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py index 24689d0c19..da05d9ad0f 100644 --- a/bigframes/_config/experiment_options.py +++ b/bigframes/_config/experiment_options.py @@ -25,7 +25,6 @@ class ExperimentOptions: def __init__(self): self._semantic_operators: bool = False self._blob: bool = False - self._udf: bool = False @property def semantic_operators(self) -> bool: diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 2f494c5ca6..7fc654287d 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -807,6 +807,8 @@ def udf( https://pip.pypa.io/en/stable/reference/requirements-file-format/. """ + warnings.warn("udf is in preview.", category=bfe.PreviewWarning) + # Some defaults may be used from the session if not provided otherwise. session = self._resolve_session(session) diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 92ffae4068..a43dd74a30 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -18,6 +18,7 @@ import pytest import bigframes +import bigframes.exceptions as bfe import bigframes.pandas as bpd from tests.system.utils import cleanup_function_assets @@ -220,9 +221,11 @@ def test_managed_function_series_apply_array_output( ): try: - @session.udf() - def foo_list(x: int) -> list[float]: - return [float(abs(x)), float(abs(x) + 1)] + with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): + + @session.udf() + def foo_list(x: int) -> list[float]: + return [float(abs(x)), float(abs(x) + 1)] scalars_df, scalars_pandas_df = scalars_dfs diff --git a/tests/unit/_config/test_experiment_options.py b/tests/unit/_config/test_experiment_options.py index 9735e494be..8e612be06c 100644 --- a/tests/unit/_config/test_experiment_options.py +++ b/tests/unit/_config/test_experiment_options.py @@ -46,18 +46,3 @@ def test_blob_set_true_shows_warning(): options.blob = True assert options.blob is True - - -def test_udf_default_false(): - options = experiment_options.ExperimentOptions() - - assert options.udf is False - - -def test_udf_set_true_shows_warning(): - options = experiment_options.ExperimentOptions() - - with pytest.warns(bfe.PreviewWarning): - options.udf = True - - assert options.udf is True From 1f09a726b02e8f98ef2a534f6ad8c07448518a1a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 04:15:20 +0000 Subject: [PATCH 3/8] capture references in remote_function --- bigframes/functions/_function_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index c28af02398..f68385ab44 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -515,7 +515,7 @@ def provision_bq_remote_function( # Augment user package requirements with any internal package # requirements package_requirements = _utils._get_updated_package_requirements( - package_requirements, is_row_processor + package_requirements, is_row_processor, True ) # Compute a unique hash representing the user code From 3a49c8081a12b329d24054fa2f17a6ed876717b6 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 04:17:52 +0000 Subject: [PATCH 4/8] keep capture_references default to the curren tbehavior (True) for remote_function --- bigframes/functions/_function_client.py | 2 +- bigframes/functions/_utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index f68385ab44..c28af02398 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -515,7 +515,7 @@ def provision_bq_remote_function( # Augment user package requirements with any internal package # requirements package_requirements = _utils._get_updated_package_requirements( - package_requirements, is_row_processor, True + package_requirements, is_row_processor ) # Compute a unique hash representing the user code diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 30d8bdcb79..1d930a280d 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -64,7 +64,7 @@ def get_remote_function_locations(bq_location): def _get_updated_package_requirements( - package_requirements=None, is_row_processor=False, capture_references=False + package_requirements=None, is_row_processor=False, capture_references=True ): requirements = [] if capture_references: From b36213ebb28195eff1253036f9c31b297b7f40f5 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 17:34:25 +0000 Subject: [PATCH 5/8] target e2e tests to python 3.12 --- noxfile.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 1f604adcbf..bcab34d0c0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -59,6 +59,10 @@ DEFAULT_PYTHON_VERSION = "3.10" +# Cloud Run Functions supports Python versions up to 3.12 +# https://cloud.google.com/run/docs/runtimes/python +E2E_TEST_PYTHON_VERSION = "3.12" + UNIT_TEST_PYTHON_VERSIONS = ["3.9", "3.10", "3.11", "3.12", "3.13"] UNIT_TEST_STANDARD_DEPENDENCIES = [ "mock", @@ -418,7 +422,7 @@ def doctest(session: nox.sessions.Session): ) -@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS[-1]) +@nox.session(python=E2E_TEST_PYTHON_VERSION) def e2e(session: nox.sessions.Session): """Run the large tests in system test suite.""" run_system( From 5e9cf93dff440e38c359ac4e5e4729d6b9c73d00 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 19:23:53 +0000 Subject: [PATCH 6/8] used "udf" instead of "managed function" in the user facing messages --- bigframes/functions/_function_client.py | 20 +++++++++++--------- bigframes/functions/_function_session.py | 5 ++--- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index c28af02398..44aea57898 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -201,15 +201,17 @@ def provision_bq_managed_function( ): """Create a BigQuery managed function.""" - if capture_references: - # Check the Python version. - python_version = _utils.get_python_version() - if python_version != _MANAGED_FUNC_PYTHON_VERSION: - raise bf_formatting.create_exception_with_feedback_link( - RuntimeError, - f"Python version {python_version} is not supported yet for " - "BigFrames managed function.", - ) + # TODO(b/406283812): Expose the capability to pass down + # capture_references=True in the public udf API. + if ( + capture_references + and (python_version := _utils.get_python_version()) + != _MANAGED_FUNC_PYTHON_VERSION + ): + raise bf_formatting.create_exception_with_feedback_link( + NotImplementedError, + f"Capturing references for udf is currently supported only in Python version {_MANAGED_FUNC_PYTHON_VERSION}, you are running {python_version}.", + ) # Create BQ managed function. bq_function_args = [] diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 7fc654287d..eb76464aa9 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -850,7 +850,7 @@ def wrapper(func): ValueError, "'input_types' was not set and parameter " f"'{parameter.name}' is missing a type annotation. " - "Types are required to use managed function.", + "Types are required to use udf.", ) input_types.append(param_type) elif not isinstance(input_types, collections.abc.Sequence): @@ -863,8 +863,7 @@ def wrapper(func): raise bf_formatting.create_exception_with_feedback_link( ValueError, "'output_type' was not set and function is missing a " - "return type annotation. Types are required to use " - "managed function.", + "return type annotation. Types are required to use udf", ) # The function will actually be receiving a pandas Series, but allow From bcd9529dbc17af4fc89954af07010b3da06cc45a Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Tue, 25 Mar 2025 22:03:58 +0000 Subject: [PATCH 7/8] clarify the self-contained udf in the doc --- bigframes/functions/_function_session.py | 8 +++++++- bigframes/session/__init__.py | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index eb76464aa9..c04de54be6 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -755,7 +755,13 @@ def udf( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, ): - """Decorator to turn a Python udf into a BigQuery managed function. + """Decorator to turn a Python user defined function (udf) into a + BigQuery managed function. + + .. note:: + The udf must be self-contained, i.e. it must not contain any + references to an import or variable defined outside the function + body. .. note:: Please have following IAM roles enabled for you: diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index dfee41c90b..9de0e091f3 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1435,7 +1435,13 @@ def udf( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, ): - """Decorator to turn a Python udf into a BigQuery managed function. + """Decorator to turn a Python user defined function (udf) into a + BigQuery managed function. + + .. note:: + The udf must be self-contained, i.e. it must not contain any + references to an import or variable defined outside the function + body. .. note:: Please have following IAM roles enabled for you: From 4d6dd0ef9d4a88299d074e7ad79ae3b9f8ea750d Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Wed, 26 Mar 2025 01:27:25 +0000 Subject: [PATCH 8/8] export udf from pandas module --- bigframes/pandas/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 8ea7e6c320..730c287e1f 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -362,4 +362,5 @@ def reset_session(): "get_global_session", "close_session", "reset_session", + "udf", ]