Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions bigframes/_config/experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def __init__(self):
self._semantic_operators: bool = False
self._ai_operators: bool = False
self._blob: bool = False
self._udf: bool = False

@property
def semantic_operators(self) -> bool:
Expand Down Expand Up @@ -68,17 +67,3 @@ def blob(self, value: bool):
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._blob = value

@property
def udf(self) -> bool:
return self._udf

@udf.setter
def udf(self, value: bool):
if value is True:
msg = bfe.format_message(
"BigFrames managed function (udf) is still under experiments. "
"It may not work and subject to change in the future."
)
warnings.warn(msg, category=bfe.PreviewWarning)
self._udf = value
75 changes: 58 additions & 17 deletions bigframes/functions/_function_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
}
)

# BQ managed functions (@udf) currently only support Python 3.11.
_MANAGED_FUNC_PYTHON_VERSION = "python-3.11"


class FunctionClient:
# Wait time (in seconds) for an IAM binding to take effect after creation.
Expand Down Expand Up @@ -193,11 +196,22 @@ def provision_bq_managed_function(
name,
packages,
is_row_processor,
*,
capture_references=False,
):
"""Create a BigQuery managed function."""
import cloudpickle

pickled = cloudpickle.dumps(func)
# TODO(b/406283812): Expose the capability to pass down
# capture_references=True in the public udf API.
if (
capture_references
and (python_version := _utils.get_python_version())
!= _MANAGED_FUNC_PYTHON_VERSION
):
raise bf_formatting.create_exception_with_feedback_link(
NotImplementedError,
f"Capturing references for udf is currently supported only in Python version {_MANAGED_FUNC_PYTHON_VERSION}, you are running {python_version}.",
)

# Create BQ managed function.
bq_function_args = []
Expand All @@ -209,13 +223,15 @@ def provision_bq_managed_function(
bq_function_args.append(f"{name_} {type_}")

managed_function_options = {
"runtime_version": _utils.get_python_version(),
"runtime_version": _MANAGED_FUNC_PYTHON_VERSION,
"entry_point": "bigframes_handler",
}

# Augment user package requirements with any internal package
# requirements.
packages = _utils._get_updated_package_requirements(packages, is_row_processor)
packages = _utils._get_updated_package_requirements(
packages, is_row_processor, capture_references
)
if packages:
managed_function_options["packages"] = packages
managed_function_options_str = self._format_function_options(
Expand All @@ -235,20 +251,45 @@ def provision_bq_managed_function(
persistent_func_id = (
f"`{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}"
)
create_function_ddl = textwrap.dedent(
f"""
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
LANGUAGE python
OPTIONS ({managed_function_options_str})
AS r'''

udf_name = func.__name__
if capture_references:
# This code path ensures that if the udf body contains any
# references to variables and/or imports outside the body, they are
# captured as well.
import cloudpickle
udf = cloudpickle.loads({pickled})
def bigframes_handler(*args):
return udf(*args)
'''
"""
).strip()

pickled = cloudpickle.dumps(func)
udf_code = textwrap.dedent(
f"""
import cloudpickle
{udf_name} = cloudpickle.loads({pickled})
"""
)
else:
# This code path ensures that if the udf body is self contained,
# i.e. there are no references to variables or imports outside the
# body.
udf_code = textwrap.dedent(inspect.getsource(func))
udf_code = udf_code[udf_code.index("def") :]

create_function_ddl = (
textwrap.dedent(
f"""
CREATE OR REPLACE FUNCTION {persistent_func_id}({','.join(bq_function_args)})
RETURNS {bq_function_return_type}
LANGUAGE python
OPTIONS ({managed_function_options_str})
AS r'''
__UDF_PLACE_HOLDER__
def bigframes_handler(*args):
return {udf_name}(*args)
'''
"""
)
.strip()
.replace("__UDF_PLACE_HOLDER__", udf_code)
)

self._ensure_dataset_exists()
self._create_bq_function(create_function_ddl)
Expand Down
27 changes: 10 additions & 17 deletions bigframes/functions/_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@

from . import _function_client, _utils

# BQ managed functions (@udf) currently only support Python 3.11.
_MANAGED_FUNC_PYTHON_VERSIONS = ("python-3.11",)


class FunctionSession:
"""Session to manage bigframes functions."""
Expand Down Expand Up @@ -758,7 +755,13 @@ def udf(
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
):
"""Decorator to turn a Python udf into a BigQuery managed function.
"""Decorator to turn a Python user defined function (udf) into a
BigQuery managed function.

.. note::
The udf must be self-contained, i.e. it must not contain any
references to an import or variable defined outside the function
body.

.. note::
Please have following IAM roles enabled for you:
Expand Down Expand Up @@ -809,17 +812,8 @@ def udf(
of the form supported in
https://pip.pypa.io/en/stable/reference/requirements-file-format/.
"""
if not bigframes.options.experiments.udf:
raise bf_formatting.create_exception_with_feedback_link(NotImplementedError)

# Check the Python version.
python_version = _utils.get_python_version()
if python_version not in _MANAGED_FUNC_PYTHON_VERSIONS:
raise bf_formatting.create_exception_with_feedback_link(
RuntimeError,
f"Python version {python_version} is not supported yet for "
"BigFrames managed function.",
)
warnings.warn("udf is in preview.", category=bfe.PreviewWarning)

# Some defaults may be used from the session if not provided otherwise.
session = self._resolve_session(session)
Expand Down Expand Up @@ -862,7 +856,7 @@ def wrapper(func):
ValueError,
"'input_types' was not set and parameter "
f"'{parameter.name}' is missing a type annotation. "
"Types are required to use managed function.",
"Types are required to use udf.",
)
input_types.append(param_type)
elif not isinstance(input_types, collections.abc.Sequence):
Expand All @@ -875,8 +869,7 @@ def wrapper(func):
raise bf_formatting.create_exception_with_feedback_link(
ValueError,
"'output_type' was not set and function is missing a "
"return type annotation. Types are required to use "
"managed function.",
"return type annotation. Types are required to use udf",
)

# The function will actually be receiving a pandas Series, but allow
Expand Down
7 changes: 5 additions & 2 deletions bigframes/functions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,12 @@ def get_remote_function_locations(bq_location):


def _get_updated_package_requirements(
package_requirements=None, is_row_processor=False
package_requirements=None, is_row_processor=False, capture_references=True
):
requirements = [f"cloudpickle=={cloudpickle.__version__}"]
requirements = []
if capture_references:
requirements.append(f"cloudpickle=={cloudpickle.__version__}")

if is_row_processor:
# bigframes function will send an entire row of data as json, which
# would be converted to a pandas series and processed Ensure numpy
Expand Down
1 change: 1 addition & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,4 +362,5 @@ def reset_session():
"get_global_session",
"close_session",
"reset_session",
"udf",
]
8 changes: 7 additions & 1 deletion bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,13 @@ def udf(
name: Optional[str] = None,
packages: Optional[Sequence[str]] = None,
):
"""Decorator to turn a Python udf into a BigQuery managed function.
"""Decorator to turn a Python user defined function (udf) into a
BigQuery managed function.

.. note::
The udf must be self-contained, i.e. it must not contain any
references to an import or variable defined outside the function
body.

.. note::
Please have following IAM roles enabled for you:
Expand Down
4 changes: 1 addition & 3 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@

# Cloud Run Functions supports Python versions up to 3.12
# https://cloud.google.com/run/docs/runtimes/python
# Managed Python UDF is supported only in Python 3.11
# Let's set the E2E tests version to 3.11 to cover most code paths.
E2E_TEST_PYTHON_VERSION = "3.11"
E2E_TEST_PYTHON_VERSION = "3.12"

UNIT_TEST_PYTHON_VERSIONS = ["3.9", "3.10", "3.11", "3.12", "3.13"]
UNIT_TEST_STANDARD_DEPENDENCIES = [
Expand Down
57 changes: 13 additions & 44 deletions tests/system/large/functions/test_managed_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest

import bigframes
import bigframes.exceptions as bfe
import bigframes.pandas as bpd
from tests.system.utils import cleanup_function_assets

Expand All @@ -27,8 +28,6 @@
reason="temporarily disable to debug managed udf cleanup in the test project"
)

bpd.options.experiments.udf = True


def test_managed_function_multiply_with_ibis(
session,
Expand Down Expand Up @@ -127,21 +126,12 @@ def stringify(x):
cleanup_function_assets(stringify, bigquery_client, ignore_failures=False)


@pytest.mark.parametrize(
"array_dtype",
[
bool,
int,
float,
str,
],
)
def test_managed_function_array_output(session, scalars_dfs, dataset_id, array_dtype):
def test_managed_function_array_output(session, scalars_dfs, dataset_id):
try:

@session.udf(dataset=dataset_id)
def featurize(x: int) -> list[array_dtype]: # type: ignore
return [array_dtype(i) for i in [x, x + 1, x + 2]]
def featurize(x: int) -> list[float]:
return [float(i) for i in [x, x + 1, x + 2]]

scalars_df, scalars_pandas_df = scalars_dfs

Expand All @@ -166,7 +156,7 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore

# Test on the function from read_gbq_function.
got = featurize_ref(10)
assert got == [array_dtype(i) for i in [10, 11, 12]]
assert got == [10.0, 11.0, 12.0]

bf_result_gbq = bf_int64_col.apply(featurize_ref).to_pandas()
pandas.testing.assert_series_equal(bf_result_gbq, pd_result, check_dtype=False)
Expand All @@ -176,30 +166,18 @@ def featurize(x: int) -> list[array_dtype]: # type: ignore
cleanup_function_assets(featurize, session.bqclient, ignore_failures=False)


@pytest.mark.parametrize(
("typ",),
[
pytest.param(int),
pytest.param(float),
pytest.param(bool),
pytest.param(str),
pytest.param(bytes),
],
)
def test_managed_function_series_apply(
session,
typ,
scalars_dfs,
):
try:

@session.udf()
def foo(x: int) -> typ: # type:ignore
# The bytes() constructor expects a non-negative interger as its arg.
return typ(abs(x))
def foo(x: int) -> bytes:
return bytes(abs(x))

# Function should still work normally.
assert foo(-2) == typ(2)
assert foo(-2) == bytes(2)

assert hasattr(foo, "bigframes_bigquery_function")
assert hasattr(foo, "ibis_node")
Expand Down Expand Up @@ -243,26 +221,17 @@ def foo(x: int) -> typ: # type:ignore
cleanup_function_assets(foo, session.bqclient, ignore_failures=False)


@pytest.mark.parametrize(
("typ",),
[
pytest.param(int),
pytest.param(float),
pytest.param(bool),
pytest.param(str),
],
)
def test_managed_function_series_apply_array_output(
session,
typ,
scalars_dfs,
):
try:

@session.udf()
def foo_list(x: int) -> list[typ]: # type:ignore
# The bytes() constructor expects a non-negative interger as its arg.
return [typ(abs(x)), typ(abs(x) + 1)]
with pytest.warns(bfe.PreviewWarning, match="udf is in preview."):

@session.udf()
def foo_list(x: int) -> list[float]:
return [float(abs(x)), float(abs(x) + 1)]

scalars_df, scalars_pandas_df = scalars_dfs

Expand Down
15 changes: 0 additions & 15 deletions tests/unit/_config/test_experiment_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,3 @@ def test_blob_set_true_shows_warning():
options.blob = True

assert options.blob is True


def test_udf_default_false():
options = experiment_options.ExperimentOptions()

assert options.udf is False


def test_udf_set_true_shows_warning():
options = experiment_options.ExperimentOptions()

with pytest.warns(bfe.PreviewWarning):
options.udf = True

assert options.udf is True