diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py index abe465de50..bb3966839c 100644 --- a/bigframes/_config/experiment_options.py +++ b/bigframes/_config/experiment_options.py @@ -26,7 +26,6 @@ def __init__(self): self._semantic_operators: bool = False self._ai_operators: bool = False self._blob: bool = False - self._udf: bool = False @property def semantic_operators(self) -> bool: @@ -68,17 +67,3 @@ def blob(self, value: bool): ) warnings.warn(msg, category=bfe.PreviewWarning) self._blob = value - - @property - def udf(self) -> bool: - return self._udf - - @udf.setter - def udf(self, value: bool): - if value is True: - msg = bfe.format_message( - "BigFrames managed function (udf) is still under experiments. " - "It may not work and subject to change in the future." - ) - warnings.warn(msg, category=bfe.PreviewWarning) - self._udf = value diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 37b435eeec..44aea57898 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -53,6 +53,9 @@ } ) +# BQ managed functions (@udf) currently only support Python 3.11. +_MANAGED_FUNC_PYTHON_VERSION = "python-3.11" + class FunctionClient: # Wait time (in seconds) for an IAM binding to take effect after creation. @@ -193,11 +196,22 @@ def provision_bq_managed_function( name, packages, is_row_processor, + *, + capture_references=False, ): """Create a BigQuery managed function.""" - import cloudpickle - pickled = cloudpickle.dumps(func) + # TODO(b/406283812): Expose the capability to pass down + # capture_references=True in the public udf API. + if ( + capture_references + and (python_version := _utils.get_python_version()) + != _MANAGED_FUNC_PYTHON_VERSION + ): + raise bf_formatting.create_exception_with_feedback_link( + NotImplementedError, + f"Capturing references for udf is currently supported only in Python version {_MANAGED_FUNC_PYTHON_VERSION}, you are running {python_version}.", + ) # Create BQ managed function. bq_function_args = [] @@ -209,13 +223,15 @@ def provision_bq_managed_function( bq_function_args.append(f"{name_} {type_}") managed_function_options = { - "runtime_version": _utils.get_python_version(), + "runtime_version": _MANAGED_FUNC_PYTHON_VERSION, "entry_point": "bigframes_handler", } # Augment user package requirements with any internal package # requirements. - packages = _utils._get_updated_package_requirements(packages, is_row_processor) + packages = _utils._get_updated_package_requirements( + packages, is_row_processor, capture_references + ) if packages: managed_function_options["packages"] = packages managed_function_options_str = self._format_function_options( @@ -235,20 +251,45 @@ def provision_bq_managed_function( persistent_func_id = ( f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}" ) - create_function_ddl = textwrap.dedent( - f""" - CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)}) - RETURNS {bq_function_return_type} - LANGUAGE python - OPTIONS ({managed_function_options_str}) - AS r''' + + udf_name = func.__name__ + if capture_references: + # This code path ensures that if the udf body contains any + # references to variables and/or imports outside the body, they are + # captured as well. import cloudpickle - udf = cloudpickle.loads({pickled}) - def bigframes_handler(*args): - return udf(*args) - ''' - """ - ).strip() + + pickled = cloudpickle.dumps(func) + udf_code = textwrap.dedent( + f""" + import cloudpickle + {udf_name} = cloudpickle.loads({pickled}) + """ + ) + else: + # This code path ensures that if the udf body is self contained, + # i.e. there are no references to variables or imports outside the + # body. + udf_code = textwrap.dedent(inspect.getsource(func)) + udf_code = udf_code[udf_code.index("def") :] + + create_function_ddl = ( + textwrap.dedent( + f""" + CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)}) + RETURNS {bq_function_return_type} + LANGUAGE python + OPTIONS ({managed_function_options_str}) + AS r''' + __UDF_PLACE_HOLDER__ + def bigframes_handler(*args): + return {udf_name}(*args) + ''' + """ + ) + .strip() + .replace("__UDF_PLACE_HOLDER__", udf_code) + ) self._ensure_dataset_exists() self._create_bq_function(create_function_ddl) diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 1444457c90..c04de54be6 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -58,9 +58,6 @@ from . import _function_client, _utils -# BQ managed functions (@udf) currently only support Python 3.11. -_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",) - class FunctionSession: """Session to manage bigframes functions.""" @@ -758,7 +755,13 @@ def udf( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, ): - """Decorator to turn a Python udf into a BigQuery managed function. + """Decorator to turn a Python user defined function (udf) into a + BigQuery managed function. + + .. note:: + The udf must be self-contained, i.e. it must not contain any + references to an import or variable defined outside the function + body. .. note:: Please have following IAM roles enabled for you: @@ -809,17 +812,8 @@ def udf( of the form supported in https://pip.pypa.io/en/stable/reference/requirements-file-format/. """ - if not bigframes.options.experiments.udf: - raise bf_formatting.create_exception_with_feedback_link(NotImplementedError) - # Check the Python version. - python_version = _utils.get_python_version() - if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS: - raise bf_formatting.create_exception_with_feedback_link( - RuntimeError, - f"Python version {python_version} is not supported yet for " - "BigFrames managed function.", - ) + warnings.warn("udf is in preview.", category=bfe.PreviewWarning) # Some defaults may be used from the session if not provided otherwise. session = self._resolve_session(session) @@ -862,7 +856,7 @@ def wrapper(func): ValueError, "'input_types' was not set and parameter " f"'{parameter.name}' is missing a type annotation. " - "Types are required to use managed function.", + "Types are required to use udf.", ) input_types.append(param_type) elif not isinstance(input_types, collections.abc.Sequence): @@ -875,8 +869,7 @@ def wrapper(func): raise bf_formatting.create_exception_with_feedback_link( ValueError, "'output_type' was not set and function is missing a " - "return type annotation. Types are required to use " - "managed function.", + "return type annotation. Types are required to use udf", ) # The function will actually be receiving a pandas Series, but allow diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 9247017380..1d930a280d 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -64,9 +64,12 @@ def get_remote_function_locations(bq_location): def _get_updated_package_requirements( - package_requirements=None, is_row_processor=False + package_requirements=None, is_row_processor=False, capture_references=True ): - requirements = [f"cloudpickle=={cloudpickle.__version__}"] + requirements = [] + if capture_references: + requirements.append(f"cloudpickle=={cloudpickle.__version__}") + if is_row_processor: # bigframes function will send an entire row of data as json, which # would be converted to a pandas series and processed Ensure numpy diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 8ea7e6c320..730c287e1f 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -362,4 +362,5 @@ def reset_session(): "get_global_session", "close_session", "reset_session", + "udf", ] diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index dfee41c90b..9de0e091f3 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1435,7 +1435,13 @@ def udf( name: Optional[str] = None, packages: Optional[Sequence[str]] = None, ): - """Decorator to turn a Python udf into a BigQuery managed function. + """Decorator to turn a Python user defined function (udf) into a + BigQuery managed function. + + .. note:: + The udf must be self-contained, i.e. it must not contain any + references to an import or variable defined outside the function + body. .. note:: Please have following IAM roles enabled for you: diff --git a/noxfile.py b/noxfile.py index 77b32ab15d..bcab34d0c0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -61,9 +61,7 @@ # Cloud Run Functions supports Python versions up to 3.12 # https://cloud.google.com/run/docs/runtimes/python -# Managed Python UDF is supported only in Python 3.11 -# Let's set the E2E tests version to 3.11 to cover most code paths. -E2E_TEST_PYTHON_VERSION = "3.11" +E2E_TEST_PYTHON_VERSION = "3.12" UNIT_TEST_PYTHON_VERSIONS = ["3.9", "3.10", "3.11", "3.12", "3.13"] UNIT_TEST_STANDARD_DEPENDENCIES = [ diff --git a/tests/system/large/functions/test_managed_function.py b/tests/system/large/functions/test_managed_function.py index 7001736bb2..eabafd96fb 100644 --- a/tests/system/large/functions/test_managed_function.py +++ b/tests/system/large/functions/test_managed_function.py @@ -18,6 +18,7 @@ import pytest import bigframes +import bigframes.exceptions as bfe import bigframes.pandas as bpd from tests.system.utils import cleanup_function_assets @@ -27,8 +28,6 @@ reason="temporarily disable to debug managed udf cleanup in the test project" ) -bpd.options.experiments.udf = True - def test_managed_function_multiply_with_ibis( session, @@ -127,21 +126,12 @@ def stringify(x): cleanup_function_assets(stringify, bigquery_client, ignore_failures=False) -@pytest.mark.parametrize( - "array_dtype", - [ - bool, - int, - float, - str, - ], -) -def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype): +def test_managed_function_array_output(session, scalars_dfs, dataset_id): try: @session.udf(dataset=dataset_id) - def featurize(x: int) -> list[array_dtype]: # type: ignore - return [array_dtype(i) for i in [x, x + 1, x + 2]] + def featurize(x: int) -> list[float]: + return [float(i) for i in [x, x + 1, x + 2]] scalars_df, scalars_pandas_df = scalars_dfs @@ -166,7 +156,7 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore # Test on the function from read_gbq_function. got = featurize_ref(10) - assert got == [array_dtype(i) for i in [10, 11, 12]] + assert got == [10.0, 11.0, 12.0] bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas() pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False) @@ -176,30 +166,18 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore cleanup_function_assets(featurize, session.bqclient, ignore_failures=False) -@pytest.mark.parametrize( - ("typ",), - [ - pytest.param(int), - pytest.param(float), - pytest.param(bool), - pytest.param(str), - pytest.param(bytes), - ], -) def test_managed_function_series_apply( session, - typ, scalars_dfs, ): try: @session.udf() - def foo(x: int) -> typ: # type:ignore - # The bytes() constructor expects a non-negative interger as its arg. - return typ(abs(x)) + def foo(x: int) -> bytes: + return bytes(abs(x)) # Function should still work normally. - assert foo(-2) == typ(2) + assert foo(-2) == bytes(2) assert hasattr(foo, "bigframes_bigquery_function") assert hasattr(foo, "ibis_node") @@ -243,26 +221,17 @@ def foo(x: int) -> typ: # type:ignore cleanup_function_assets(foo, session.bqclient, ignore_failures=False) -@pytest.mark.parametrize( - ("typ",), - [ - pytest.param(int), - pytest.param(float), - pytest.param(bool), - pytest.param(str), - ], -) def test_managed_function_series_apply_array_output( session, - typ, scalars_dfs, ): try: - @session.udf() - def foo_list(x: int) -> list[typ]: # type:ignore - # The bytes() constructor expects a non-negative interger as its arg. - return [typ(abs(x)), typ(abs(x) + 1)] + with pytest.warns(bfe.PreviewWarning, match="udf is in preview."): + + @session.udf() + def foo_list(x: int) -> list[float]: + return [float(abs(x)), float(abs(x) + 1)] scalars_df, scalars_pandas_df = scalars_dfs diff --git a/tests/unit/_config/test_experiment_options.py b/tests/unit/_config/test_experiment_options.py index ce1dd0f146..1e5a8326f7 100644 --- a/tests/unit/_config/test_experiment_options.py +++ b/tests/unit/_config/test_experiment_options.py @@ -61,18 +61,3 @@ def test_blob_set_true_shows_warning(): options.blob = True assert options.blob is True - - -def test_udf_default_false(): - options = experiment_options.ExperimentOptions() - - assert options.udf is False - - -def test_udf_set_true_shows_warning(): - options = experiment_options.ExperimentOptions() - - with pytest.warns(bfe.PreviewWarning): - options.udf = True - - assert options.udf is True