diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py index 8d1ca38e62..3338c0389e 100644 --- a/bigframes/blob/_functions.py +++ b/bigframes/blob/_functions.py @@ -21,7 +21,13 @@ import bigframes.session import bigframes.session._io.bigquery as bf_io_bigquery -_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} +_PYTHON_TO_BQ_TYPES = { + int: "INT64", + float: "FLOAT64", + str: "STRING", + bytes: "BYTES", + bool: "BOOL", +} @dataclass(frozen=True) @@ -112,7 +118,7 @@ def udf(self): return self._session.read_gbq_function(udf_name) -def exif_func(src_obj_ref_rt: str) -> str: +def exif_func(src_obj_ref_rt: str, verbose: bool) -> str: import io import json @@ -120,25 +126,36 @@ def exif_func(src_obj_ref_rt: str) -> str: import requests from requests import adapters - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + result_dict = {"status": "", "content": "{}"} + try: + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - response = session.get(src_url, timeout=30) - bts = response.content + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - image = Image.open(io.BytesIO(bts)) - exif_data = image.getexif() - exif_dict = {} - if exif_data: - for tag, value in exif_data.items(): - tag_name = ExifTags.TAGS.get(tag, tag) - exif_dict[tag_name] = value + response = session.get(src_url, timeout=30) + bts = response.content + + image = Image.open(io.BytesIO(bts)) + exif_data = image.getexif() + exif_dict = {} + if exif_data: + for tag, value in exif_data.items(): + tag_name = ExifTags.TAGS.get(tag, tag) + # Pillow might return bytes, which are not serializable. + if isinstance(value, bytes): + value = value.decode("utf-8", "replace") + exif_dict[tag_name] = value + result_dict["content"] = json.dumps(exif_dict) + except Exception as e: + result_dict["status"] = str(e) - return json.dumps(exif_dict) + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] exif_func_def = FunctionDef(exif_func, ["pillow", "requests"]) @@ -146,82 +163,109 @@ def exif_func(src_obj_ref_rt: str) -> str: # Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. def image_blur_func( - src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str + src_obj_ref_rt: str, + dst_obj_ref_rt: str, + ksize_x: int, + ksize_y: int, + ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + bts = cv.imencode(ext, img_blurred)[1].tobytes() - bts = cv.imencode(ext, img_blurred)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) def image_blur_to_bytes_func( - src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str -) -> bytes: + src_obj_ref_rt: str, ksize_x: int, ksize_y: int, ext: str, verbose: bool +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + status = "" + content = b"" + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + content = cv.imencode(ext, img_blurred)[1].tobytes() - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) - bts = cv.imencode(ext, img_blurred)[1].tobytes() + except Exception as e: + status = str(e) - return bts + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_blur_to_bytes_def = FunctionDef( @@ -237,49 +281,59 @@ def image_resize_func( fx: float, fy: float, ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + bts = cv.imencode(ext, img_resized)[1].tobytes() - bts = cv.imencode(ext, img_resized)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_def = FunctionDef( @@ -294,31 +348,45 @@ def image_resize_to_bytes_func( fx: float, fy: float, ext: str, -) -> bytes: + verbose: bool, +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + status = "" + content = b"" + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) + content = cv.imencode(".jpeg", img_resized)[1].tobytes() - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_resized = cv.resize(img, dsize=(dsize_x, dsize_y), fx=fx, fy=fy) - bts = cv.imencode(".jpeg", img_resized)[1].tobytes() + except Exception as e: + status = str(e) - return bts + encoded_content = base64.b64encode(content).decode("utf-8") + result_dict = {"status": status, "content": encoded_content} + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_resize_to_bytes_def = FunctionDef( @@ -333,58 +401,68 @@ def image_normalize_func( beta: float, norm_type: str, ext: str, + verbose: bool, ) -> str: import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": dst_obj_ref_rt} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] - dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + response = session.get(src_url, timeout=30) + bts = response.content - response = session.get(src_url, timeout=30) - bts = response.content + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) + bts = cv.imencode(ext, img_normalized)[1].tobytes() - bts = cv.imencode(ext, img_normalized)[1].tobytes() + ext = ext.replace(".", "") + ext_mappings = {"jpg": "jpeg", "tif": "tiff"} + ext = ext_mappings.get(ext, ext) + content_type = "image/" + ext - ext = ext.replace(".", "") - ext_mappings = {"jpg": "jpeg", "tif": "tiff"} - ext = ext_mappings.get(ext, ext) - content_type = "image/" + ext + session.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": content_type, + }, + timeout=30, + ) - session.put( - url=dst_url, - data=bts, - headers={ - "Content-Type": content_type, - }, - timeout=30, - ) + except Exception as e: + result_dict["status"] = str(e) - return dst_obj_ref_rt + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_def = FunctionDef( @@ -393,41 +471,59 @@ def image_normalize_func( def image_normalize_to_bytes_func( - src_obj_ref_rt: str, alpha: float, beta: float, norm_type: str, ext: str -) -> bytes: + src_obj_ref_rt: str, + alpha: float, + beta: float, + norm_type: str, + ext: str, + verbose: bool, +) -> str: + import base64 import json - import cv2 as cv # type: ignore - import numpy as np - import requests - from requests import adapters + result_dict = {"status": "", "content": ""} + + try: + import cv2 as cv # type: ignore + import numpy as np + import requests + from requests import adapters + + session = requests.Session() + session.mount("https://", adapters.HTTPAdapter(max_retries=3)) - session = requests.Session() - session.mount("https://", adapters.HTTPAdapter(max_retries=3)) + ext = ext or ".jpeg" - ext = ext or ".jpeg" + norm_type_mapping = { + "inf": cv.NORM_INF, + "l1": cv.NORM_L1, + "l2": cv.NORM_L2, + "minmax": cv.NORM_MINMAX, + } - norm_type_mapping = { - "inf": cv.NORM_INF, - "l1": cv.NORM_L1, - "l2": cv.NORM_L2, - "minmax": cv.NORM_MINMAX, - } + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + + response = session.get(src_url, timeout=30) + bts = response.content - src_obj_ref_rt_json = json.loads(src_obj_ref_rt) - src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_normalized = cv.normalize( + img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] + ) + bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() - response = session.get(src_url, timeout=30) - bts = response.content + content_b64 = base64.b64encode(bts).decode("utf-8") + result_dict["content"] = content_b64 - nparr = np.frombuffer(bts, np.uint8) - img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) - img_normalized = cv.normalize( - img, None, alpha=alpha, beta=beta, norm_type=norm_type_mapping[norm_type] - ) - bts = cv.imencode(".jpeg", img_normalized)[1].tobytes() + except Exception as e: + result_dict["status"] = str(e) - return bts + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] image_normalize_to_bytes_def = FunctionDef( @@ -436,7 +532,7 @@ def image_normalize_to_bytes_func( # Extracts all text from a PDF url -def pdf_extract_func(src_obj_ref_rt: str) -> str: +def pdf_extract_func(src_obj_ref_rt: str, verbose: bool) -> str: try: import io import json @@ -469,8 +565,10 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: except Exception as e: result_dict = {"status": str(e), "content": ""} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return result_dict["content"] pdf_extract_def = FunctionDef( @@ -479,7 +577,9 @@ def pdf_extract_func(src_obj_ref_rt: str) -> str: # Extracts text from a PDF url and chunks it simultaneously -def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> str: +def pdf_chunk_func( + src_obj_ref_rt: str, chunk_size: int, overlap_size: int, verbose: bool +) -> str: try: import io import json @@ -525,8 +625,10 @@ def pdf_chunk_func(src_obj_ref_rt: str, chunk_size: int, overlap_size: int) -> s except Exception as e: result_dict = {"status": str(e), "content": []} - result_json = json.dumps(result_dict) - return result_json + if verbose: + return json.dumps(result_dict) + else: + return json.dumps(result_dict["content"]) pdf_chunk_def = FunctionDef( diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index 63875ded99..a0750c050b 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -308,6 +308,7 @@ def exif( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Extract EXIF data. Now only support image types. @@ -317,18 +318,21 @@ def exif( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: JSON series of key-value pairs. + bigframes.series.Series: JSON series of key-value pairs if verbose=False, or struct with status and content if verbose=True. """ if engine is None or engine.casefold() != "pillow": raise ValueError("Must specify the engine, supported value is 'pillow'.") import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose exif_udf = blob_func.TransformFunction( blob_func.exif_func_def, @@ -340,9 +344,21 @@ def exif( ).udf() res = self._df_apply_udf(df, exif_udf) - res = bbq.parse_json(res) - return res + if verbose: + exif_content_series = bbq.parse_json( + res._apply_unary_op(ops.JSONValue(json_path="$.content")) + ).rename("exif_content") + exif_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": exif_status_series, "content": exif_content_series} + ) + results_struct = bbq.struct(results_df).rename("exif_results") + return results_struct + else: + return bbq.parse_json(res) def image_blur( self, @@ -354,6 +370,7 @@ def image_blur( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Blurs images. @@ -369,14 +386,17 @@ def image_blur( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -395,9 +415,29 @@ def image_blur( df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) - return res + if verbose: + blurred_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + blurred_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[blurred_content_b64_series] + ) + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": blurred_content_series} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + blurred_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("blurred_bytes") + return blurred_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -423,11 +463,24 @@ def image_blur( df = df.join(dst_rt, how="outer") df["ksize_x"], df["ksize_y"] = ksize df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_blur_udf) res.cache() # to execute the udf - return dst + if verbose: + blurred_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) + results_df = bpd.DataFrame( + {"status": blurred_status_series, "content": dst_blobs} + ) + results_struct = bbq.struct(results_df).rename("blurred_results") + return results_struct + else: + return res.str.to_blob(connection=connection) def image_resize( self, @@ -441,6 +494,7 @@ def image_resize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ): """Resize images. @@ -458,9 +512,10 @@ def image_resize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") @@ -472,7 +527,9 @@ def image_resize( "Only one of dsize or (fx, fy) parameters must be set. And the set values must be positive. " ) + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -492,9 +549,30 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) - return res + if verbose: + resized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + resized_content_series = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[resized_content_b64_series] + ) + + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": resized_status_series, "content": resized_content_series} + ) + results_struct = bbq.struct(results_df).rename("resized_results") + return results_struct + else: + resized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("resized_bytes") + return resized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -521,11 +599,24 @@ def image_resize( df["dsize_x"], df["dsizye_y"] = dsize df["fx"], df["fy"] = fx, fy df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_resize_udf) res.cache() # to execute the udf - return dst + if verbose: + resized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) + results_df = bpd.DataFrame( + {"status": resized_status_series, "content": dst_blobs} + ) + results_struct = bbq.struct(results_df).rename("resized_results") + return results_struct + else: + return res.str.to_blob(connection=connection) def image_normalize( self, @@ -539,6 +630,7 @@ def image_normalize( max_batching_rows: int = 8192, container_cpu: Union[float, int] = 0.33, container_memory: str = "512Mi", + verbose: bool = False, ) -> bigframes.series.Series: """Normalize images. @@ -556,14 +648,17 @@ def image_normalize( max_batching_rows (int, default 8,192): Max number of rows per batch send to cloud run to execute the function. container_cpu (int or float, default 0.33): number of container CPUs. Possible values are [0.33, 8]. Floats larger than 1 are cast to intergers. container_memory (str, default "512Mi"): container memory size. String of the format . Possible values are from 512Mi to 32Gi. + verbose (bool, default False): If True, returns a struct with status and content fields. If False, returns only the content. Returns: - bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. + bigframes.series.Series: blob Series if destination is GCS. Or bytes Series if destination is BQ. If verbose=True, returns struct with status and content. """ if engine is None or engine.casefold() != "opencv": raise ValueError("Must specify the engine, supported value is 'opencv'.") + import bigframes.bigquery as bbq import bigframes.blob._functions as blob_func + import bigframes.pandas as bpd connection = self._resolve_connection(connection) df = self.get_runtime_json_str(mode="R").to_frame() @@ -584,9 +679,29 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) - return res + if verbose: + normalized_content_b64_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[normalized_content_b64_series] + ) + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + results_df = bpd.DataFrame( + {"status": normalized_status_series, "content": normalized_bytes} + ) + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct + else: + normalized_bytes = bbq.sql_scalar( + "FROM_BASE64({0})", columns=[res] + ).rename("normalized_bytes") + return normalized_bytes if isinstance(dst, str): dst = os.path.join(dst, "") @@ -614,11 +729,27 @@ def image_normalize( df["beta"] = beta df["norm_type"] = norm_type df["ext"] = ext # type: ignore + df["verbose"] = verbose res = self._df_apply_udf(df, image_normalize_udf) res.cache() # to execute the udf - return dst + if verbose: + normalized_status_series = res._apply_unary_op( + ops.JSONValue(json_path="$.status") + ) + content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + dst_blobs = content_series.str.to_blob(connection=connection) + results_df = bpd.DataFrame( + { + "status": normalized_status_series, + "content": dst_blobs, + } + ) + results_struct = bbq.struct(results_df).rename("normalized_results") + return results_struct + else: + return res.str.to_blob(connection=connection) def pdf_extract( self, @@ -670,19 +801,22 @@ def pdf_extract( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - - res = src_rt.apply(pdf_extract_udf) - - content_series = res._apply_unary_op(ops.JSONValue(json_path="$.content")) + df = self.get_runtime_json_str(mode="R").to_frame() + df["verbose"] = verbose + res = self._df_apply_udf(df, pdf_extract_udf) if verbose: + extracted_content_series = res._apply_unary_op( + ops.JSONValue(json_path="$.content") + ) status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + results_df = bpd.DataFrame( + {"status": status_series, "content": extracted_content_series} + ) + results_struct = bbq.struct(results_df).rename("extracted_results") + return results_struct else: - return content_series + return res.rename("extracted_content") def pdf_chunk( self, @@ -749,21 +883,23 @@ def pdf_chunk( container_memory=container_memory, ).udf() - src_rt = self.get_runtime_json_str(mode="R") - df = src_rt.to_frame() + df = self.get_runtime_json_str(mode="R").to_frame() df["chunk_size"] = chunk_size df["overlap_size"] = overlap_size + df["verbose"] = verbose res = self._df_apply_udf(df, pdf_chunk_udf) - content_series = bbq.json_extract_string_array(res, "$.content") if verbose: + chunked_content_series = bbq.json_extract_string_array(res, "$.content") status_series = res._apply_unary_op(ops.JSONValue(json_path="$.status")) - res_df = bpd.DataFrame({"status": status_series, "content": content_series}) - struct_series = bbq.struct(res_df) - return struct_series + results_df = bpd.DataFrame( + {"status": status_series, "content": chunked_content_series} + ) + resultes_struct = bbq.struct(results_df).rename("chunked_results") + return resultes_struct else: - return content_series + return bbq.json_extract_string_array(res, "$").rename("chunked_content") def audio_transcribe( self, @@ -827,7 +963,7 @@ def audio_transcribe( transcribed_content_series = cast( bpd.Series, transcribed_results["ml_generate_text_llm_result"] - ).rename("transcribed_content") + ) if verbose: transcribed_status_series = cast( @@ -842,4 +978,4 @@ def audio_transcribe( results_struct = bbq.struct(results_df).rename("transcription_results") return results_struct else: - return transcribed_content_series + return transcribed_content_series.rename("transcribed_content") diff --git a/tests/system/large/blob/test_function.py b/tests/system/large/blob/test_function.py index c8fa63d493..74a1eaaaeb 100644 --- a/tests/system/large/blob/test_function.py +++ b/tests/system/large/blob/test_function.py @@ -51,9 +51,11 @@ def images_output_uris(images_output_folder: str) -> list[str]: ] +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_exif( bq_connection: str, session: bigframes.Session, + verbose: bool, ): exif_image_df = session.from_glob_path( "gs://bigframes_blob_test/images_exif/*", @@ -62,166 +64,294 @@ def test_blob_exif( ) actual = exif_image_df["blob_col"].blob.exif( - engine="pillow", connection=bq_connection - ) - expected = bpd.Series( - ['{"ExifOffset": 47, "Make": "MyCamera"}'], - session=session, - dtype=dtypes.JSON_DTYPE, - ) - pd.testing.assert_series_equal( - actual.to_pandas(), - expected.to_pandas(), - check_dtype=False, - check_index_type=False, + engine="pillow", connection=bq_connection, verbose=verbose ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.JSON_DTYPE + + else: + expected = bpd.Series( + ['{"ExifOffset": 47, "Make": "MyCamera"}'], + session=session, + dtype=dtypes.JSON_DTYPE, + ) + pd.testing.assert_series_equal( + actual.to_pandas(), + expected.to_pandas(), + check_dtype=False, + check_index_type=False, + ) + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_blur_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection ) actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=series, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (8, 8), dst=series, connection=bq_connection, engine="opencv", verbose=verbose ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_blur_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), dst=images_output_folder, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (8, 8), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -def test_blob_image_blur_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_blur_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_blur( - (8, 8), connection=bq_connection, engine="opencv" + (8, 8), connection=bq_connection, engine="opencv", verbose=verbose ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + + else: + assert actual.dtype == dtypes.BYTES_DTYPE + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection ) actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=series, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (200, 300), + dst=series, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_resize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), dst=images_output_folder, connection=bq_connection, engine="opencv" - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + (200, 300), + dst=images_output_folder, + connection=bq_connection, + engine="opencv", + verbose=verbose, ) + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + # verify the files exist assert not actual.blob.size().isna().any() -def test_blob_image_resize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_resize_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_resize( - (200, 300), connection=bq_connection, engine="opencv" + (200, 300), connection=bq_connection, engine="opencv", verbose=verbose ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + else: + assert actual.dtype == dtypes.BYTES_DTYPE + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_series( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_uris: list[str], session: bigframes.Session, + verbose: bool, ): series = bpd.Series(images_output_uris, session=session).str.to_blob( connection=bq_connection @@ -234,31 +364,50 @@ def test_blob_image_normalize_to_series( dst=series, connection=bq_connection, engine="opencv", - ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, + verbose=verbose, ) - # verify the files exist - assert not actual.blob.size().isna().any() + if verbose: + + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) + + # verify the files exist + assert not actual.blob.size().isna().any() + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_image_normalize_to_folder( images_mm_df: bpd.DataFrame, bq_connection: str, images_output_folder: str, images_output_uris: list[str], + verbose: bool, ): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, @@ -267,47 +416,74 @@ def test_blob_image_normalize_to_folder( dst=images_output_folder, connection=bq_connection, engine="opencv", + verbose=verbose, ) - expected_df = pd.DataFrame( - { - "uri": images_output_uris, - "version": [None, None], - "authorizer": [bq_connection.casefold(), bq_connection.casefold()], - "details": [None, None], - } - ) - pd.testing.assert_frame_equal( - actual.struct.explode().to_pandas(), - expected_df, - check_dtype=False, - check_index_type=False, - ) + + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns + + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + # Content should be blob objects for GCS destination + assert hasattr(content_series, "blob") + + else: + expected_df = pd.DataFrame( + { + "uri": images_output_uris, + "version": [None, None], + "authorizer": [bq_connection.casefold(), bq_connection.casefold()], + "details": [None, None], + } + ) + pd.testing.assert_frame_equal( + actual.struct.explode().to_pandas(), + expected_df, + check_dtype=False, + check_index_type=False, + ) # verify the files exist assert not actual.blob.size().isna().any() -def test_blob_image_normalize_to_bq(images_mm_df: bpd.DataFrame, bq_connection: str): +@pytest.mark.parametrize("verbose", [True, False]) +def test_blob_image_normalize_to_bq( + images_mm_df: bpd.DataFrame, bq_connection: str, verbose: bool +): actual = images_mm_df["blob_col"].blob.image_normalize( alpha=50.0, beta=150.0, norm_type="minmax", connection=bq_connection, engine="opencv", + verbose=verbose, ) assert isinstance(actual, bpd.Series) assert len(actual) == 2 - assert actual.dtype == dtypes.BYTES_DTYPE + if verbose: + assert hasattr(actual, "struct") + actual_exploded = actual.struct.explode() + assert "status" in actual_exploded.columns + assert "content" in actual_exploded.columns -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) + status_series = actual_exploded["status"] + assert status_series.dtype == dtypes.STRING_DTYPE + + content_series = actual_exploded["content"] + assert content_series.dtype == dtypes.BYTES_DTYPE + else: + assert actual.dtype == dtypes.BYTES_DTYPE + + +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_pdf_extract( pdf_mm_df: bpd.DataFrame, verbose: bool, @@ -350,13 +526,7 @@ def test_blob_pdf_extract( ), f"Item (verbose={verbose}): Expected keyword '{keyword}' not found in extracted text. " -@pytest.mark.parametrize( - "verbose", - [ - (True), - (False), - ], -) +@pytest.mark.parametrize("verbose", [True, False]) def test_blob_pdf_chunk(pdf_mm_df: bpd.DataFrame, verbose: bool, bq_connection: str): actual = ( pdf_mm_df["pdf"]