From d99f12d447d18d39f4a7066be6ca0f1304b48be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a?= Date: Mon, 11 Aug 2025 14:58:56 +0000 Subject: [PATCH] fix: make ExecutionMetrics stats tracking more robust to missing stats --- GEMINI.md | 2 + bigframes/session/metrics.py | 14 +- tests/unit/session/test_metrics.py | 247 +++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 7 deletions(-) create mode 100644 tests/unit/session/test_metrics.py diff --git a/GEMINI.md b/GEMINI.md index 8c37580c7f..d26a51ebfc 100644 --- a/GEMINI.md +++ b/GEMINI.md @@ -45,6 +45,8 @@ We use `nox` to instrument our tests. nox -r -s lint ``` +- When writing tests, use the idiomatic "pytest" style. + ## Documentation If a method or property is implementing the same interface as a third-party diff --git a/bigframes/session/metrics.py b/bigframes/session/metrics.py index 36e48ee9ec..8ec8d525cc 100644 --- a/bigframes/session/metrics.py +++ b/bigframes/session/metrics.py @@ -42,9 +42,9 @@ def count_job_stats( assert row_iterator is not None # TODO(tswast): Pass None after making benchmark publishing robust to missing data. - bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) - query_char_count = len(getattr(row_iterator, "query", "")) - slot_millis = getattr(row_iterator, "slot_millis", 0) + bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0 + query_char_count = len(getattr(row_iterator, "query", "") or "") + slot_millis = getattr(row_iterator, "slot_millis", 0) or 0 exec_seconds = 0.0 self.execution_count += 1 @@ -63,10 +63,10 @@ def count_job_stats( elif (stats := get_performance_stats(query_job)) is not None: query_char_count, bytes_processed, slot_millis, exec_seconds = stats self.execution_count += 1 - self.query_char_count += query_char_count - self.bytes_processed += bytes_processed - self.slot_millis += slot_millis - self.execution_secs += exec_seconds + self.query_char_count += query_char_count or 0 + self.bytes_processed += bytes_processed or 0 + self.slot_millis += slot_millis or 0 + self.execution_secs += exec_seconds or 0 write_stats_to_disk( query_char_count=query_char_count, bytes_processed=bytes_processed, diff --git a/tests/unit/session/test_metrics.py b/tests/unit/session/test_metrics.py new file mode 100644 index 0000000000..7c2f01c5b9 --- /dev/null +++ b/tests/unit/session/test_metrics.py @@ -0,0 +1,247 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +import unittest.mock + +import google.cloud.bigquery as bigquery +import pytest + +import bigframes.session.metrics as metrics + +NOW = datetime.datetime.now(datetime.timezone.utc) + + +def test_count_job_stats_with_row_iterator(): + row_iterator = unittest.mock.create_autospec( + bigquery.table.RowIterator, instance=True + ) + row_iterator.total_bytes_processed = 1024 + row_iterator.query = "SELECT * FROM table" + row_iterator.slot_millis = 1234 + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(row_iterator=row_iterator) + + assert execution_metrics.execution_count == 1 + assert execution_metrics.bytes_processed == 1024 + assert execution_metrics.query_char_count == 19 + assert execution_metrics.slot_millis == 1234 + + +def test_count_job_stats_with_row_iterator_missing_stats(): + row_iterator = unittest.mock.create_autospec( + bigquery.table.RowIterator, instance=True + ) + # Simulate properties not being present on the object + del row_iterator.total_bytes_processed + del row_iterator.query + del row_iterator.slot_millis + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(row_iterator=row_iterator) + + assert execution_metrics.execution_count == 1 + assert execution_metrics.bytes_processed == 0 + assert execution_metrics.query_char_count == 0 + assert execution_metrics.slot_millis == 0 + + +def test_count_job_stats_with_row_iterator_none_stats(): + row_iterator = unittest.mock.create_autospec( + bigquery.table.RowIterator, instance=True + ) + row_iterator.total_bytes_processed = None + row_iterator.query = None + row_iterator.slot_millis = None + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(row_iterator=row_iterator) + + assert execution_metrics.execution_count == 1 + assert execution_metrics.bytes_processed == 0 + assert execution_metrics.query_char_count == 0 + assert execution_metrics.slot_millis == 0 + + +def test_count_job_stats_with_dry_run(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = True + query_job.query = "SELECT * FROM table" + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(query_job=query_job) + + # Dry run jobs shouldn't count as "executed" + assert execution_metrics.execution_count == 0 + assert execution_metrics.bytes_processed == 0 + assert execution_metrics.query_char_count == 0 + assert execution_metrics.slot_millis == 0 + + +def test_count_job_stats_with_valid_job(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.query = "SELECT * FROM table" + query_job.total_bytes_processed = 2048 + query_job.slot_millis = 5678 + query_job.created = NOW + query_job.ended = NOW + datetime.timedelta(seconds=2) + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(query_job=query_job) + + assert execution_metrics.execution_count == 1 + assert execution_metrics.bytes_processed == 2048 + assert execution_metrics.query_char_count == 19 + assert execution_metrics.slot_millis == 5678 + assert execution_metrics.execution_secs == pytest.approx(2.0) + + +def test_count_job_stats_with_cached_job(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.query = "SELECT * FROM table" + # Cache hit jobs don't have total_bytes_processed or slot_millis + query_job.total_bytes_processed = None + query_job.slot_millis = None + query_job.created = NOW + query_job.ended = NOW + datetime.timedelta(seconds=1) + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(query_job=query_job) + + assert execution_metrics.execution_count == 1 + assert execution_metrics.bytes_processed == 0 + assert execution_metrics.query_char_count == 19 + assert execution_metrics.slot_millis == 0 + assert execution_metrics.execution_secs == pytest.approx(1.0) + + +def test_count_job_stats_with_unsupported_job(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.query = "SELECT * FROM table" + # Some jobs, such as scripts, don't have these properties. + query_job.total_bytes_processed = None + query_job.slot_millis = None + query_job.created = None + query_job.ended = None + execution_metrics = metrics.ExecutionMetrics() + execution_metrics.count_job_stats(query_job=query_job) + + # Don't count jobs if we can't get performance stats. + assert execution_metrics.execution_count == 0 + assert execution_metrics.bytes_processed == 0 + assert execution_metrics.query_char_count == 0 + assert execution_metrics.slot_millis == 0 + assert execution_metrics.execution_secs == pytest.approx(0.0) + + +def test_get_performance_stats_with_valid_job(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.query = "SELECT * FROM table" + query_job.total_bytes_processed = 2048 + query_job.slot_millis = 5678 + query_job.created = NOW + query_job.ended = NOW + datetime.timedelta(seconds=2) + stats = metrics.get_performance_stats(query_job) + assert stats is not None + query_char_count, bytes_processed, slot_millis, exec_seconds = stats + assert query_char_count == 19 + assert bytes_processed == 2048 + assert slot_millis == 5678 + assert exec_seconds == pytest.approx(2.0) + + +def test_get_performance_stats_with_dry_run(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = True + stats = metrics.get_performance_stats(query_job) + assert stats is None + + +def test_get_performance_stats_with_missing_timestamps(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.created = None + query_job.ended = NOW + stats = metrics.get_performance_stats(query_job) + assert stats is None + + query_job.created = NOW + query_job.ended = None + stats = metrics.get_performance_stats(query_job) + assert stats is None + + +def test_get_performance_stats_with_mocked_types(): + query_job = unittest.mock.create_autospec(bigquery.QueryJob, instance=True) + query_job.configuration.dry_run = False + query_job.created = NOW + query_job.ended = NOW + query_job.total_bytes_processed = unittest.mock.Mock() + query_job.slot_millis = 123 + stats = metrics.get_performance_stats(query_job) + assert stats is None + + query_job.total_bytes_processed = 123 + query_job.slot_millis = unittest.mock.Mock() + stats = metrics.get_performance_stats(query_job) + assert stats is None + + +@pytest.fixture +def mock_environ(monkeypatch): + """Fixture to mock os.environ.""" + monkeypatch.setenv(metrics.LOGGING_NAME_ENV_VAR, "my_test_case") + + +def test_write_stats_to_disk_writes_files(tmp_path, mock_environ): + os.chdir(tmp_path) + test_name = os.environ[metrics.LOGGING_NAME_ENV_VAR] + metrics.write_stats_to_disk( + query_char_count=100, + bytes_processed=200, + slot_millis=300, + exec_seconds=1.23, + ) + + slot_file = tmp_path / (test_name + ".slotmillis") + assert slot_file.exists() + with open(slot_file) as f: + assert f.read() == "300\n" + + exec_time_file = tmp_path / (test_name + ".bq_exec_time_seconds") + assert exec_time_file.exists() + with open(exec_time_file) as f: + assert f.read() == "1.23\n" + + query_char_count_file = tmp_path / (test_name + ".query_char_count") + assert query_char_count_file.exists() + with open(query_char_count_file) as f: + assert f.read() == "100\n" + + bytes_file = tmp_path / (test_name + ".bytesprocessed") + assert bytes_file.exists() + with open(bytes_file) as f: + assert f.read() == "200\n" + + +def test_write_stats_to_disk_no_env_var(tmp_path, monkeypatch): + monkeypatch.delenv(metrics.LOGGING_NAME_ENV_VAR, raising=False) + os.chdir(tmp_path) + metrics.write_stats_to_disk( + query_char_count=100, + bytes_processed=200, + slot_millis=300, + exec_seconds=1.23, + ) + assert len(list(tmp_path.iterdir())) == 0