Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ class FunctionConflictTypeHintWarning(UserWarning):

class FunctionPackageVersionWarning(PreviewWarning):
"""
Managed UDF package versions for Numpy, Pandas, and Pyarrow may not
precisely match users' local environment or the exact versions specified.
Warns that package versions in remote function or managed function may not
match local or specified versions, which might cause unexpected behavior.
"""


Expand Down
26 changes: 20 additions & 6 deletions bigframes/functions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import google.api_core.exceptions
from google.cloud import bigquery, functions_v2
import numpy
from packaging.requirements import Requirement
import pandas
import pyarrow

Expand Down Expand Up @@ -63,6 +64,16 @@ def get_remote_function_locations(bq_location):
return bq_location, cloud_function_region


def _package_existed(package_requirements: list[str], package: str) -> bool:
"""Checks if a package (regardless of version) exists in a given list."""
if not package_requirements:
return False

return Requirement(package).name in {
Requirement(req).name for req in package_requirements
}


def get_updated_package_requirements(
package_requirements=None,
is_row_processor=False,
Expand Down Expand Up @@ -96,13 +107,16 @@ def get_updated_package_requirements(
requirements.append(f"pyarrow=={pyarrow.__version__}")
requirements.append(f"numpy=={numpy.__version__}")

# TODO(b/435023957): Fix the issue of potential duplicate package versions
# when `package_requirements` also contains `pandas/pyarrow/numpy`.
if package_requirements:
requirements.extend(package_requirements)
if not requirements:
return package_requirements

if not package_requirements:
package_requirements = []
for package in requirements:
if not _package_existed(package_requirements, package):
package_requirements.append(package)

requirements = sorted(requirements)
return requirements
return sorted(package_requirements)


def clean_up_by_session_id(
Expand Down
149 changes: 149 additions & 0 deletions tests/unit/functions/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest.mock import patch

from bigframes.functions._utils import (
_package_existed,
get_updated_package_requirements,
)


def test_get_updated_package_requirements_no_extra_package():
"""Tests with no extra package."""
result = get_updated_package_requirements(capture_references=False)
assert result is None

initial_packages = ["xgboost"]
result = get_updated_package_requirements(
initial_packages, capture_references=False
)
assert result == initial_packages


@patch("bigframes.functions._utils.numpy.__version__", "1.24.4")
@patch("bigframes.functions._utils.pyarrow.__version__", "14.0.1")
@patch("bigframes.functions._utils.pandas.__version__", "2.0.3")
@patch("bigframes.functions._utils.cloudpickle.__version__", "2.2.1")
def test_get_updated_package_requirements_is_row_processor_with_versions():
"""Tests with is_row_processor=True and specific versions."""
expected = [
"cloudpickle==2.2.1",
"numpy==1.24.4",
"pandas==2.0.3",
"pyarrow==14.0.1",
]
result = get_updated_package_requirements(is_row_processor=True)
assert result == expected


@patch("bigframes.functions._utils.warnings.warn")
@patch("bigframes.functions._utils.cloudpickle.__version__", "2.2.1")
def test_get_updated_package_requirements_ignore_version(mock_warn):
"""
Tests with is_row_processor=True and ignore_package_version=True.
Should add packages without versions and raise a warning.
"""
expected = ["cloudpickle==2.2.1", "numpy", "pandas", "pyarrow"]
result = get_updated_package_requirements(
is_row_processor=True, ignore_package_version=True
)
assert result == expected
# Verify that a warning was issued.
mock_warn.assert_called_once()


@patch("bigframes.functions._utils.numpy.__version__", "1.24.4")
@patch("bigframes.functions._utils.pyarrow.__version__", "14.0.1")
@patch("bigframes.functions._utils.pandas.__version__", "2.0.3")
def test_get_updated_package_requirements_capture_references_false():
"""
Tests with capture_references=False.
Should not add cloudpickle but should add others if requested.
"""
# Case 1: Only capture_references=False.
result_1 = get_updated_package_requirements(capture_references=False)
assert result_1 is None

# Case 2: capture_references=False but is_row_processor=True.
expected_2 = ["numpy==1.24.4", "pandas==2.0.3", "pyarrow==14.0.1"]
result_2 = get_updated_package_requirements(
is_row_processor=True, capture_references=False
)
assert result_2 == expected_2


@patch("bigframes.functions._utils.numpy.__version__", "1.24.4")
@patch("bigframes.functions._utils.pyarrow.__version__", "14.0.1")
@patch("bigframes.functions._utils.pandas.__version__", "2.0.3")
@patch("bigframes.functions._utils.cloudpickle.__version__", "2.2.1")
def test_get_updated_package_requirements_non_overlapping_packages():
"""Tests providing an initial list of packages that do not overlap."""
initial_packages = ["scikit-learn==1.3.0", "xgboost"]
expected = [
"cloudpickle==2.2.1",
"numpy==1.24.4",
"pandas==2.0.3",
"pyarrow==14.0.1",
"scikit-learn==1.3.0",
"xgboost",
]
result = get_updated_package_requirements(
package_requirements=initial_packages, is_row_processor=True
)
assert result == expected


@patch("bigframes.functions._utils.numpy.__version__", "1.24.4")
@patch("bigframes.functions._utils.pyarrow.__version__", "14.0.1")
@patch("bigframes.functions._utils.pandas.__version__", "2.0.3")
@patch("bigframes.functions._utils.cloudpickle.__version__", "2.2.1")
def test_get_updated_package_requirements_overlapping_packages():
"""Tests that packages are not added if they already exist."""
# The function should respect the pre-existing pandas version.
initial_packages = ["pandas==1.5.3", "numpy"]
expected = [
"cloudpickle==2.2.1",
"numpy",
"pandas==1.5.3",
"pyarrow==14.0.1",
]
result = get_updated_package_requirements(
package_requirements=initial_packages, is_row_processor=True
)
assert result == expected


@patch("bigframes.functions._utils.cloudpickle.__version__", "2.2.1")
def test_get_updated_package_requirements_with_existing_cloudpickle():
"""Tests that cloudpickle is not added if it already exists."""
initial_packages = ["cloudpickle==2.0.0"]
expected = ["cloudpickle==2.0.0"]
result = get_updated_package_requirements(package_requirements=initial_packages)
assert result == expected


def test_package_existed_helper():
"""Tests the _package_existed helper function directly."""
reqs = ["pandas==1.0", "numpy", "scikit-learn>=1.2.0"]
# Exact match
assert _package_existed(reqs, "pandas==1.0")
# Different version
assert _package_existed(reqs, "pandas==2.0")
# No version specified
assert _package_existed(reqs, "numpy")
# Not in list
assert not _package_existed(reqs, "xgboost")
# Empty list
assert not _package_existed([], "pandas")