Skip to content

feat: session.bytes_processed_sum will be updated when allow_large_re… #1669

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,6 @@ def _project(self):
@property
def bytes_processed_sum(self):
"""The sum of all bytes processed by bigquery jobs using this session."""
msg = bfe.format_message(
"Queries executed with `allow_large_results=False` within the session will not "
"have their bytes processed counted in this sum. If you need precise "
"bytes processed information, query the `INFORMATION_SCHEMA` tables "
"to get relevant metrics.",
)
warnings.warn(msg, UserWarning)
return self._metrics.bytes_processed

@property
Expand Down
5 changes: 2 additions & 3 deletions bigframes/session/_io/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
import google.api_core.exceptions
import google.cloud.bigquery as bigquery
import google.cloud.bigquery.table

from bigframes.core import log_adapter
import bigframes.core.compile.googlesql as googlesql
Expand Down Expand Up @@ -249,7 +248,7 @@ def start_query_with_client(
max_results=max_results,
)
if metrics is not None:
metrics.count_job_stats(query=sql)
metrics.count_job_stats(row_iterator=results_iterator)
return results_iterator, None

query_job = bq_client.query(
Expand Down Expand Up @@ -278,7 +277,7 @@ def start_query_with_client(
)

if metrics is not None:
metrics.count_job_stats(query_job)
metrics.count_job_stats(query_job=query_job)
return results_iterator, query_job


Expand Down
51 changes: 26 additions & 25 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import google.cloud.bigquery as bigquery
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"

Expand All @@ -33,14 +34,22 @@ class ExecutionMetrics:
query_char_count: int = 0

def count_job_stats(
self, query_job: Optional[bq_job.QueryJob] = None, query: str = ""
self,
query_job: Optional[bq_job.QueryJob] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
query_char_count = len(query)
assert row_iterator is not None
if (row_iterator.total_bytes_processed is None) or (
row_iterator.query is None
):
return
query_char_count = len(row_iterator.query)
bytes_processed = row_iterator.total_bytes_processed
self.execution_count += 1
self.query_char_count += query_char_count
if LOGGING_NAME_ENV_VAR in os.environ:
write_stats_to_disk(query_char_count)
self.bytes_processed += bytes_processed
write_stats_to_disk(query_char_count, bytes_processed)
return

stats = get_performance_stats(query_job)
Expand All @@ -51,11 +60,9 @@ def count_job_stats(
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
self.execution_secs += execution_secs
if LOGGING_NAME_ENV_VAR in os.environ:
# when running notebooks via pytest nbmake
write_stats_to_disk(
query_char_count, bytes_processed, slot_millis, execution_secs
)
write_stats_to_disk(
query_char_count, bytes_processed, slot_millis, execution_secs
)


def get_performance_stats(
Expand Down Expand Up @@ -88,32 +95,21 @@ def get_performance_stats(

def write_stats_to_disk(
query_char_count: int,
bytes_processed: Optional[int] = None,
bytes_processed: int,
slot_millis: Optional[int] = None,
exec_seconds: Optional[float] = None,
):
"""For pytest runs only, log information about the query job
to a file in order to create a performance report.
"""
if LOGGING_NAME_ENV_VAR not in os.environ:
raise EnvironmentError(
"Environment variable {env_var} is not set".format(
env_var=LOGGING_NAME_ENV_VAR
)
)
return

# when running notebooks via pytest nbmake and running benchmarks
test_name = os.environ[LOGGING_NAME_ENV_VAR]
current_directory = os.getcwd()

if (
(bytes_processed is not None)
and (slot_millis is not None)
and (exec_seconds is not None)
):
# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")

if (slot_millis is not None) and (exec_seconds is not None):
# store slot milliseconds
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
with open(slot_file, "a") as f:
Expand All @@ -132,3 +128,8 @@ def write_stats_to_disk(
)
with open(query_char_count_file, "a") as f:
f.write(str(query_char_count) + "\n")

# store bytes processed
bytes_file = os.path.join(current_directory, test_name + ".bytesprocessed")
with open(bytes_file, "a") as f:
f.write(str(bytes_processed) + "\n")
43 changes: 17 additions & 26 deletions scripts/run_and_publish_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ def collect_benchmark_result(
error_files = sorted(path.rglob("*.error"))

if not (
len(bytes_files)
== len(millis_files)
len(millis_files)
== len(bq_seconds_files)
<= len(query_char_count_files)
<= len(bytes_files)
== len(query_char_count_files)
== len(local_seconds_files)
):
raise ValueError(
Expand All @@ -108,10 +108,13 @@ def collect_benchmark_result(
for idx in range(len(local_seconds_files)):
query_char_count_file = query_char_count_files[idx]
local_seconds_file = local_seconds_files[idx]
bytes_file = bytes_files[idx]
filename = query_char_count_file.relative_to(path).with_suffix("")
if filename != local_seconds_file.relative_to(path).with_suffix(""):
if filename != local_seconds_file.relative_to(path).with_suffix(
""
) or filename != bytes_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch between query_char_count and seconds reports."
"File name mismatch among query_char_count, bytes and seconds reports."
)

with open(query_char_count_file, "r") as file:
Expand All @@ -123,27 +126,23 @@ def collect_benchmark_result(
lines = file.read().splitlines()
local_seconds = sum(float(line) for line in lines) / iterations

with open(bytes_file, "r") as file:
lines = file.read().splitlines()
total_bytes = sum(int(line) for line in lines) / iterations

if not has_full_metrics:
total_bytes = None
total_slot_millis = None
bq_seconds = None
else:
bytes_file = bytes_files[idx]
millis_file = millis_files[idx]
bq_seconds_file = bq_seconds_files[idx]
if (
filename != bytes_file.relative_to(path).with_suffix("")
or filename != millis_file.relative_to(path).with_suffix("")
or filename != bq_seconds_file.relative_to(path).with_suffix("")
):
if filename != millis_file.relative_to(path).with_suffix(
""
) or filename != bq_seconds_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch among query_char_count, bytes, millis, and seconds reports."
)

with open(bytes_file, "r") as file:
lines = file.read().splitlines()
total_bytes = sum(int(line) for line in lines) / iterations

with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines) / iterations
Expand Down Expand Up @@ -202,11 +201,7 @@ def collect_benchmark_result(
print(
f"{index} - query count: {row['Query_Count']},"
+ f" query char count: {row['Query_Char_Count']},"
+ (
f" bytes processed sum: {row['Bytes_Processed']},"
if has_full_metrics
else ""
)
+ f" bytes processed sum: {row['Bytes_Processed']},"
+ (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "")
+ f" local execution time: {formatted_local_exec_time} seconds"
+ (
Expand Down Expand Up @@ -238,11 +233,7 @@ def collect_benchmark_result(
print(
f"---Geometric mean of queries: {geometric_mean_queries},"
+ f" Geometric mean of queries char counts: {geometric_mean_query_char_count},"
+ (
f" Geometric mean of bytes processed: {geometric_mean_bytes},"
if has_full_metrics
else ""
)
+ f" Geometric mean of bytes processed: {geometric_mean_bytes},"
+ (
f" Geometric mean of slot millis: {geometric_mean_slot_millis},"
if has_full_metrics
Expand Down