From 8ae2d70857005bf18973660d1fc80bd1168153ea Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 30 Jul 2025 02:57:49 +0000 Subject: [PATCH 1/8] update benchmarks to use the total_rows parameter --- scripts/run_and_publish_benchmark.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/scripts/run_and_publish_benchmark.py b/scripts/run_and_publish_benchmark.py index 248322f619..78f9f4f29b 100644 --- a/scripts/run_and_publish_benchmark.py +++ b/scripts/run_and_publish_benchmark.py @@ -89,6 +89,7 @@ def collect_benchmark_result( bq_seconds_files = sorted(path.rglob("*.bq_exec_time_seconds")) local_seconds_files = sorted(path.rglob("*.local_exec_time_seconds")) query_char_count_files = sorted(path.rglob("*.query_char_count")) + total_rows_files = sorted(path.rglob("*.totalrows")) error_files = sorted(path.rglob("*.error")) @@ -98,6 +99,7 @@ def collect_benchmark_result( <= len(bytes_files) == len(query_char_count_files) == len(local_seconds_files) + == len(total_rows_files) ): raise ValueError( "Mismatch in the number of report files for bytes, millis, seconds and query char count: \n" @@ -106,6 +108,7 @@ def collect_benchmark_result( f"bytes_files: {len(bytes_files)}\n" f"query_char_count_files: {len(query_char_count_files)}\n" f"local_seconds_files: {len(local_seconds_files)}\n" + f"total_rows_files: {len(total_rows_files)}\n" ) has_full_metrics = len(bq_seconds_files) == len(local_seconds_files) @@ -138,14 +141,18 @@ def collect_benchmark_result( if not has_full_metrics: total_slot_millis = None bq_seconds = None + total_rows = None else: millis_file = millis_files[idx] bq_seconds_file = bq_seconds_files[idx] - if filename != millis_file.relative_to(path).with_suffix( - "" - ) or filename != bq_seconds_file.relative_to(path).with_suffix(""): + total_rows_file = total_rows_files[idx] + if ( + filename != millis_file.relative_to(path).with_suffix("") + or filename != bq_seconds_file.relative_to(path).with_suffix("") + or filename != total_rows_file.relative_to(path).with_suffix("") + ): raise ValueError( - "File name mismatch among query_char_count, bytes, millis, and seconds reports." + "File name mismatch among query_char_count, bytes, millis, seconds and total_rows reports." ) with open(millis_file, "r") as file: @@ -156,6 +163,10 @@ def collect_benchmark_result( lines = file.read().splitlines() bq_seconds = sum(float(line) for line in lines) / iterations + with open(total_rows_file, "r") as file: + lines = file.read().splitlines() + total_rows = sum(int(line) for line in lines) / iterations + results_dict[str(filename)] = [ query_count, total_bytes, @@ -163,6 +174,7 @@ def collect_benchmark_result( local_seconds, bq_seconds, query_char_count, + total_rows, ] finally: for files_to_remove in ( @@ -171,6 +183,7 @@ def collect_benchmark_result( path.rglob("*.local_exec_time_seconds"), path.rglob("*.bq_exec_time_seconds"), path.rglob("*.query_char_count"), + path.rglob("*.totalrows"), path.rglob("*.error"), ): for log_file in files_to_remove: @@ -183,6 +196,7 @@ def collect_benchmark_result( "Local_Execution_Time_Sec", "BigQuery_Execution_Time_Sec", "Query_Char_Count", + "Total_Rows", ] benchmark_metrics = pd.DataFrame.from_dict( @@ -206,6 +220,7 @@ def collect_benchmark_result( print( f"{index} - query count: {row['Query_Count']}," + f" query char count: {row['Query_Char_Count']}," + + f" total rows: {row['Total_Rows']}," + f" bytes processed sum: {row['Bytes_Processed']}," + (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "") + f" local execution time: {formatted_local_exec_time} seconds" @@ -234,10 +249,14 @@ def collect_benchmark_result( geometric_mean_bq_seconds = geometric_mean_excluding_zeros( benchmark_metrics["BigQuery_Execution_Time_Sec"] ) + geometric_mean_total_rows = geometric_mean_excluding_zeros( + benchmark_metrics["Total_Rows"] + ) print( f"---Geometric mean of queries: {geometric_mean_queries}," + f" Geometric mean of queries char counts: {geometric_mean_query_char_count}," + + f" Geometric mean of total rows: {geometric_mean_total_rows}," + f" Geometric mean of bytes processed: {geometric_mean_bytes}," + ( f" Geometric mean of slot millis: {geometric_mean_slot_millis}," From 7037586b13e096ee626b2b1001afd5e27e8d231e Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 30 Jul 2025 03:36:34 +0000 Subject: [PATCH 2/8] add a testcase --- .../small/test_run_and_publish_benchmark.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 tests/system/small/test_run_and_publish_benchmark.py diff --git a/tests/system/small/test_run_and_publish_benchmark.py b/tests/system/small/test_run_and_publish_benchmark.py new file mode 100644 index 0000000000..78a0d1c2ea --- /dev/null +++ b/tests/system/small/test_run_and_publish_benchmark.py @@ -0,0 +1,41 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from scripts import run_and_publish_benchmark + + +def test_collect_benchmark_result(tmp_path): + # Create dummy log files + (tmp_path / "benchmark1.bytesprocessed").write_text("100") + (tmp_path / "benchmark1.slotmillis").write_text("1000") + (tmp_path / "benchmark1.bq_exec_time_seconds").write_text("1.0") + (tmp_path / "benchmark1.local_exec_time_seconds").write_text("2.0") + (tmp_path / "benchmark1.query_char_count").write_text("50") + (tmp_path / "benchmark1.totalrows").write_text("10") + + # Collect the benchmark results + df, error_message = run_and_publish_benchmark.collect_benchmark_result( + str(tmp_path), 1 + ) + + # Assert that the DataFrame is correct + assert error_message is None + assert len(df) == 1 + assert df["Benchmark_Name"][0] == "benchmark1" + assert df["Bytes_Processed"][0] == 100 + assert df["Slot_Millis"][0] == 1000 + assert df["BigQuery_Execution_Time_Sec"][0] == 1.0 + assert df["Local_Execution_Time_Sec"][0] == 2.0 + assert df["Query_Char_Count"][0] == 50 + assert df["Total_Rows"][0] == 10 From 5e7a617652e877f1d0a637c0093443721ea4cbd6 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 30 Jul 2025 04:53:54 +0000 Subject: [PATCH 3/8] stop checking pandas_gbq typing --- mypy.ini | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mypy.ini b/mypy.ini index 7709eb200a..69f0e260cf 100644 --- a/mypy.ini +++ b/mypy.ini @@ -44,3 +44,6 @@ ignore_missing_imports = True [mypy-anywidget] ignore_missing_imports = True + +[mypy-pandas_gbq] +ignore_missing_imports = True From 4954813f594a0ac4a32de4cbb485194fd026ec41 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 6 Aug 2025 22:32:33 +0000 Subject: [PATCH 4/8] fix test --- tests/system/small/test_run_and_publish_benchmark.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/system/small/test_run_and_publish_benchmark.py b/tests/system/small/test_run_and_publish_benchmark.py index 78a0d1c2ea..d6d185675d 100644 --- a/tests/system/small/test_run_and_publish_benchmark.py +++ b/tests/system/small/test_run_and_publish_benchmark.py @@ -12,8 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path +import sys + from scripts import run_and_publish_benchmark +sys.path.insert(0, str(Path(__file__).resolve().parents[3])) + def test_collect_benchmark_result(tmp_path): # Create dummy log files From 17249a6503353606d44bd094630124feb9776d30 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Wed, 6 Aug 2025 22:57:30 +0000 Subject: [PATCH 5/8] final touch up --- .../small/test_run_and_publish_benchmark.py | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/system/small/test_run_and_publish_benchmark.py b/tests/system/small/test_run_and_publish_benchmark.py index d6d185675d..df81e390e3 100644 --- a/tests/system/small/test_run_and_publish_benchmark.py +++ b/tests/system/small/test_run_and_publish_benchmark.py @@ -15,13 +15,23 @@ from pathlib import Path import sys -from scripts import run_and_publish_benchmark - +# Add the project root to the Python path to allow for application-specific imports. sys.path.insert(0, str(Path(__file__).resolve().parents[3])) +from scripts import run_and_publish_benchmark # noqa: E402 + + +def test_collect_benchmark_result(tmp_path: Path): + """Tests the collect_benchmark_result function. + + This test verifies that the function correctly reads benchmark result + files from a specified directory, processes them, and returns a + pandas DataFrame with the expected data and types. -def test_collect_benchmark_result(tmp_path): - # Create dummy log files + Args: + tmp_path (Path): The pytest fixture providing a temporary directory path. + """ + # Arrange: Create dummy log files with benchmark data. (tmp_path / "benchmark1.bytesprocessed").write_text("100") (tmp_path / "benchmark1.slotmillis").write_text("1000") (tmp_path / "benchmark1.bq_exec_time_seconds").write_text("1.0") @@ -29,14 +39,15 @@ def test_collect_benchmark_result(tmp_path): (tmp_path / "benchmark1.query_char_count").write_text("50") (tmp_path / "benchmark1.totalrows").write_text("10") - # Collect the benchmark results + # Act: Collect the benchmark results from the temporary directory. + # The second argument '1' is a placeholder for the number of runs. df, error_message = run_and_publish_benchmark.collect_benchmark_result( str(tmp_path), 1 ) - # Assert that the DataFrame is correct - assert error_message is None - assert len(df) == 1 + # Assert: Verify the contents and structure of the resulting DataFrame. + assert error_message is None, "Expected no error messages." + assert len(df) == 1, "DataFrame should contain exactly one row." assert df["Benchmark_Name"][0] == "benchmark1" assert df["Bytes_Processed"][0] == 100 assert df["Slot_Millis"][0] == 1000 From 69ec49f45d5e5e16847411db98da347d50168e14 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 Aug 2025 05:34:45 +0000 Subject: [PATCH 6/8] let system generates *.totalrows file during notebook execution --- bigframes/session/metrics.py | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 36e48ee9ec..caa83bad63 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -32,6 +32,7 @@ class ExecutionMetrics: bytes_processed: int = 0 execution_secs: float = 0 query_char_count: int = 0 + total_rows: int = 0 def count_job_stats( self, @@ -46,11 +47,13 @@ def count_job_stats( query_char_count = len(getattr(row_iterator, "query", "")) slot_millis = getattr(row_iterator, "slot_millis", 0) exec_seconds = 0.0 + total_rows = getattr(row_iterator, "total_rows", 0) or 0 self.execution_count += 1 self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis + self.total_rows += total_rows elif query_job.configuration.dry_run: query_char_count = len(query_job.query) @@ -59,20 +62,22 @@ def count_job_stats( bytes_processed = 0 slot_millis = 0 exec_seconds = 0.0 + total_rows = 0 elif (stats := get_performance_stats(query_job)) is not None: - query_char_count, bytes_processed, slot_millis, exec_seconds = stats + ( + query_char_count, + bytes_processed, + slot_millis, + exec_seconds, + total_rows, + ) = stats self.execution_count += 1 self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis self.execution_secs += exec_seconds - write_stats_to_disk( - query_char_count=query_char_count, - bytes_processed=bytes_processed, - slot_millis=slot_millis, - exec_seconds=exec_seconds, - ) + self.total_rows += total_rows else: # TODO(tswast): Pass None after making benchmark publishing robust to missing data. @@ -80,18 +85,20 @@ def count_job_stats( query_char_count = 0 slot_millis = 0 exec_seconds = 0 + total_rows = 0 write_stats_to_disk( query_char_count=query_char_count, bytes_processed=bytes_processed, slot_millis=slot_millis, exec_seconds=exec_seconds, + total_rows=total_rows, ) def get_performance_stats( query_job: bigquery.QueryJob, -) -> Optional[Tuple[int, int, int, float]]: +) -> Optional[Tuple[int, int, int, float, int]]: """Parse the query job for performance stats. Return None if the stats do not reflect real work done in bigquery. @@ -114,6 +121,9 @@ def get_performance_stats( execution_secs = (query_job.ended - query_job.created).total_seconds() query_char_count = len(query_job.query) + # Extract total rows from query job + total_rows = getattr(query_job, "total_rows", 0) or 0 + return ( query_char_count, # Not every job populates these. For example, slot_millis is missing @@ -121,6 +131,7 @@ def get_performance_stats( bytes_processed if bytes_processed else 0, slot_millis if slot_millis else 0, execution_secs, + total_rows, ) @@ -130,6 +141,7 @@ def write_stats_to_disk( bytes_processed: int, slot_millis: int, exec_seconds: float, + total_rows: Optional[int] = None, ): """For pytest runs only, log information about the query job to a file in order to create a performance report. @@ -164,3 +176,9 @@ def write_stats_to_disk( bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") with open(bytes_file, "a") as f: f.write(str(bytes_processed) + "\n") + + # store total rows + if total_rows is not None: + total_rows_file = os.path.join(current_directory, test_name + ".totalrows") + with open(total_rows_file, "a") as f: + f.write(str(total_rows) + "\n") From 5d1b7020cc3954cfc0270e5e0dbb2047e6fd28c1 Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 Aug 2025 19:17:34 +0000 Subject: [PATCH 7/8] Revert "let system generates *.totalrows file during notebook execution" This reverts commit 48cb9c123cf136295663a23e67ed6d0a5a4857b4. --- bigframes/session/metrics.py | 34 ++++++++-------------------------- 1 file changed, 8 insertions(+), 26 deletions(-) diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index caa83bad63..36e48ee9ec 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -32,7 +32,6 @@ class ExecutionMetrics: bytes_processed: int = 0 execution_secs: float = 0 query_char_count: int = 0 - total_rows: int = 0 def count_job_stats( self, @@ -47,13 +46,11 @@ def count_job_stats( query_char_count = len(getattr(row_iterator, "query", "")) slot_millis = getattr(row_iterator, "slot_millis", 0) exec_seconds = 0.0 - total_rows = getattr(row_iterator, "total_rows", 0) or 0 self.execution_count += 1 self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis - self.total_rows += total_rows elif query_job.configuration.dry_run: query_char_count = len(query_job.query) @@ -62,22 +59,20 @@ def count_job_stats( bytes_processed = 0 slot_millis = 0 exec_seconds = 0.0 - total_rows = 0 elif (stats := get_performance_stats(query_job)) is not None: - ( - query_char_count, - bytes_processed, - slot_millis, - exec_seconds, - total_rows, - ) = stats + query_char_count, bytes_processed, slot_millis, exec_seconds = stats self.execution_count += 1 self.query_char_count += query_char_count self.bytes_processed += bytes_processed self.slot_millis += slot_millis self.execution_secs += exec_seconds - self.total_rows += total_rows + write_stats_to_disk( + query_char_count=query_char_count, + bytes_processed=bytes_processed, + slot_millis=slot_millis, + exec_seconds=exec_seconds, + ) else: # TODO(tswast): Pass None after making benchmark publishing robust to missing data. @@ -85,20 +80,18 @@ def count_job_stats( query_char_count = 0 slot_millis = 0 exec_seconds = 0 - total_rows = 0 write_stats_to_disk( query_char_count=query_char_count, bytes_processed=bytes_processed, slot_millis=slot_millis, exec_seconds=exec_seconds, - total_rows=total_rows, ) def get_performance_stats( query_job: bigquery.QueryJob, -) -> Optional[Tuple[int, int, int, float, int]]: +) -> Optional[Tuple[int, int, int, float]]: """Parse the query job for performance stats. Return None if the stats do not reflect real work done in bigquery. @@ -121,9 +114,6 @@ def get_performance_stats( execution_secs = (query_job.ended - query_job.created).total_seconds() query_char_count = len(query_job.query) - # Extract total rows from query job - total_rows = getattr(query_job, "total_rows", 0) or 0 - return ( query_char_count, # Not every job populates these. For example, slot_millis is missing @@ -131,7 +121,6 @@ def get_performance_stats( bytes_processed if bytes_processed else 0, slot_millis if slot_millis else 0, execution_secs, - total_rows, ) @@ -141,7 +130,6 @@ def write_stats_to_disk( bytes_processed: int, slot_millis: int, exec_seconds: float, - total_rows: Optional[int] = None, ): """For pytest runs only, log information about the query job to a file in order to create a performance report. @@ -176,9 +164,3 @@ def write_stats_to_disk( bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed") with open(bytes_file, "a") as f: f.write(str(bytes_processed) + "\n") - - # store total rows - if total_rows is not None: - total_rows_file = os.path.join(current_directory, test_name + ".totalrows") - with open(total_rows_file, "a") as f: - f.write(str(total_rows) + "\n") From fb93fd3288938834c3a047a5c769c60782691e2d Mon Sep 17 00:00:00 2001 From: Shuowei Li Date: Fri, 8 Aug 2025 20:59:29 +0000 Subject: [PATCH 8/8] fix e2e failure --- scripts/run_and_publish_benchmark.py | 30 ++++++++++++++++++---------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/scripts/run_and_publish_benchmark.py b/scripts/run_and_publish_benchmark.py index 78f9f4f29b..4fdfaf09d6 100644 --- a/scripts/run_and_publish_benchmark.py +++ b/scripts/run_and_publish_benchmark.py @@ -99,7 +99,6 @@ def collect_benchmark_result( <= len(bytes_files) == len(query_char_count_files) == len(local_seconds_files) - == len(total_rows_files) ): raise ValueError( "Mismatch in the number of report files for bytes, millis, seconds and query char count: \n" @@ -108,10 +107,10 @@ def collect_benchmark_result( f"bytes_files: {len(bytes_files)}\n" f"query_char_count_files: {len(query_char_count_files)}\n" f"local_seconds_files: {len(local_seconds_files)}\n" - f"total_rows_files: {len(total_rows_files)}\n" ) has_full_metrics = len(bq_seconds_files) == len(local_seconds_files) + has_total_rows = len(total_rows_files) == len(local_seconds_files) for idx in range(len(local_seconds_files)): query_char_count_file = query_char_count_files[idx] @@ -141,18 +140,14 @@ def collect_benchmark_result( if not has_full_metrics: total_slot_millis = None bq_seconds = None - total_rows = None else: millis_file = millis_files[idx] bq_seconds_file = bq_seconds_files[idx] - total_rows_file = total_rows_files[idx] - if ( - filename != millis_file.relative_to(path).with_suffix("") - or filename != bq_seconds_file.relative_to(path).with_suffix("") - or filename != total_rows_file.relative_to(path).with_suffix("") - ): + if filename != millis_file.relative_to(path).with_suffix( + "" + ) or filename != bq_seconds_file.relative_to(path).with_suffix(""): raise ValueError( - "File name mismatch among query_char_count, bytes, millis, seconds and total_rows reports." + "File name mismatch among query_char_count, bytes, millis, and seconds reports." ) with open(millis_file, "r") as file: @@ -163,6 +158,15 @@ def collect_benchmark_result( lines = file.read().splitlines() bq_seconds = sum(float(line) for line in lines) / iterations + if not has_total_rows: + total_rows = None + else: + total_rows_file = total_rows_files[idx] + if filename != total_rows_file.relative_to(path).with_suffix(""): + raise ValueError( + "File name mismatch among query_char_count, bytes, and total_rows reports." + ) + with open(total_rows_file, "r") as file: lines = file.read().splitlines() total_rows = sum(int(line) for line in lines) / iterations @@ -220,7 +224,11 @@ def collect_benchmark_result( print( f"{index} - query count: {row['Query_Count']}," + f" query char count: {row['Query_Char_Count']}," - + f" total rows: {row['Total_Rows']}," + + ( + f" total rows: {row['Total_Rows']}," + if not pd.isna(row["Total_Rows"]) + else "" + ) + f" bytes processed sum: {row['Bytes_Processed']}," + (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "") + f" local execution time: {formatted_local_exec_time} seconds"