From 3e97464cac469494de783e16bc7a1716142baa0a Mon Sep 17 00:00:00 2001 From: Tim Swena Date: Thu, 7 Aug 2025 16:01:11 +0000 Subject: [PATCH] perf: remove an unnecessary extra `dry_run` query from `read_gbq_table` Also, removes some unnecessary warnings from SQL Cell code paths. --- bigframes/exceptions.py | 4 + bigframes/pandas/io/api.py | 7 +- .../session/_io/bigquery/read_gbq_table.py | 192 +++++++++++------- bigframes/session/loader.py | 21 +- tests/unit/session/test_session.py | 38 +++- 5 files changed, 178 insertions(+), 84 deletions(-) diff --git a/bigframes/exceptions.py b/bigframes/exceptions.py index 39a847de84..076988b835 100644 --- a/bigframes/exceptions.py +++ b/bigframes/exceptions.py @@ -79,6 +79,10 @@ class TimeTravelDisabledWarning(Warning): """A query was reattempted without time travel.""" +class TimeTravelCacheWarning(Warning): + """Reads from the same table twice in the same session pull time travel from cache.""" + + class AmbiguousWindowWarning(Warning): """A query may produce nondeterministic results as the window may be ambiguously ordered.""" diff --git a/bigframes/pandas/io/api.py b/bigframes/pandas/io/api.py index a88cc7a011..cf4b4eb19c 100644 --- a/bigframes/pandas/io/api.py +++ b/bigframes/pandas/io/api.py @@ -33,6 +33,7 @@ Tuple, Union, ) +import warnings import bigframes_vendored.constants as constants import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq @@ -348,7 +349,11 @@ def _read_gbq_colab( ) _set_default_session_location_if_possible_deferred_query(create_query) if not config.options.bigquery._session_started: - config.options.bigquery.enable_polars_execution = True + with warnings.catch_warnings(): + # Don't warning about Polars in SQL cell. + # Related to b/437090788. + warnings.simplefilter("ignore", bigframes.exceptions.PreviewWarning) + config.options.bigquery.enable_polars_execution = True return global_session.with_default_session( bigframes.session.Session._read_gbq_colab, diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 6322040428..30a25762eb 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -54,26 +54,43 @@ def get_table_metadata( cached_table = cache.get(table_ref) if use_cache and cached_table is not None: - snapshot_timestamp, _ = cached_table - - # Cache hit could be unexpected. See internal issue 329545805. - # Raise a warning with more information about how to avoid the - # problems with the cache. - msg = bfe.format_message( - f"Reading cached table from {snapshot_timestamp} to avoid " - "incompatibilies with previous reads of this table. To read " - "the latest version, set `use_cache=False` or close the " - "current session with Session.close() or " - "bigframes.pandas.close_session()." - ) - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - warnings.warn(msg, stacklevel=7) + snapshot_timestamp, table = cached_table + + if is_time_travel_eligible( + bqclient=bqclient, + table=table, + columns=None, + snapshot_time=snapshot_timestamp, + filter_str=None, + # Don't warn, because that will already have been taken care of. + should_warn=False, + should_dry_run=False, + ): + # This warning should only happen if the cached snapshot_time will + # have any effect on bigframes (b/437090788). For example, with + # cached query results, such as after re-running a query, time + # travel won't be applied and thus this check is irrelevent. + # + # In other cases, such as an explicit read_gbq_table(), Cache hit + # could be unexpected. See internal issue 329545805. Raise a + # warning with more information about how to avoid the problems + # with the cache. + msg = bfe.format_message( + f"Reading cached table from {snapshot_timestamp} to avoid " + "incompatibilies with previous reads of this table. To read " + "the latest version, set `use_cache=False` or close the " + "current session with Session.close() or " + "bigframes.pandas.close_session()." + ) + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + warnings.warn(msg, category=bfe.TimeTravelCacheWarning, stacklevel=7) + return cached_table table = bqclient.get_table(table_ref) @@ -88,40 +105,74 @@ def get_table_metadata( return cached_table -def validate_table( +def is_time_travel_eligible( bqclient: bigquery.Client, table: bigquery.table.Table, columns: Optional[Sequence[str]], snapshot_time: datetime.datetime, filter_str: Optional[str] = None, -) -> bool: - """Validates that the table can be read, returns True iff snapshot is supported.""" + *, + should_warn: bool, + should_dry_run: bool, +): + """Check if a table is eligible to use time-travel. + + + Args: + table: BigQuery table to check. + should_warn: + If true, raises a warning when time travel is disabled and the + underlying table is likely mutable. + + Return: + bool: + True if there is a chance that time travel may be supported on this + table. If ``should_dry_run`` is True, then this is validated with a + ``dry_run`` query. + """ + + # user code + # -> pandas.read_gbq_table + # -> with_default_session + # -> session.read_gbq_table + # -> session._read_gbq_table + # -> loader.read_gbq_table + # -> is_time_travel_eligible + stacklevel = 7 - time_travel_not_found = False # Anonymous dataset, does not support snapshot ever if table.dataset_id.startswith("_"): - pass + return False # Only true tables support time travel - elif table.table_id.endswith("*"): - msg = bfe.format_message( - "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " - "Attempting query without time travel. Be aware that " - "modifications to the underlying data may result in errors or " - "unexpected behavior." - ) - warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) - elif table.table_type != "TABLE": - if table.table_type == "MATERIALIZED_VIEW": + if table.table_id.endswith("*"): + if should_warn: msg = bfe.format_message( - "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " - "Attempting query without time travel. Be aware that as materialized views " - "are updated periodically, modifications to the underlying data in the view may " - "result in errors or unexpected behavior." + "Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that " + "modifications to the underlying data may result in errors or " + "unexpected behavior." ) - warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) - else: - # table might support time travel, lets do a dry-run query with time travel + warnings.warn( + msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel + ) + return False + elif table.table_type != "TABLE": + if table.table_type == "MATERIALIZED_VIEW": + if should_warn: + msg = bfe.format_message( + "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " + "Attempting query without time travel. Be aware that as materialized views " + "are updated periodically, modifications to the underlying data in the view may " + "result in errors or unexpected behavior." + ) + warnings.warn( + msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel + ) + return False + + # table might support time travel, lets do a dry-run query with time travel + if should_dry_run: snapshot_sql = bigframes.session._io.bigquery.to_query( query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}", columns=columns or (), @@ -129,36 +180,39 @@ def validate_table( time_travel_timestamp=snapshot_time, ) try: - # If this succeeds, we don't need to query without time travel, that would surely succeed - bqclient.query_and_wait( - snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) + # If this succeeds, we know that time travel will for sure work. + bigframes.session._io.bigquery.start_query_with_client( + bq_client=bqclient, + sql=snapshot_sql, + job_config=bigquery.QueryJobConfig(dry_run=True), + location=None, + project=None, + timeout=None, + metrics=None, + query_with_job=False, ) return True + except google.api_core.exceptions.NotFound: - # note that a notfound caused by a simple typo will be - # caught above when the metadata is fetched, not here - time_travel_not_found = True - - # At this point, time travel is known to fail, but can we query without time travel? - snapshot_sql = bigframes.session._io.bigquery.to_query( - query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}", - columns=columns or (), - sql_predicate=filter_str, - time_travel_timestamp=None, - ) - # Any errors here should just be raised to user - bqclient.query_and_wait( - snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) - ) - if time_travel_not_found: - msg = bfe.format_message( - "NotFound error when reading table with time travel." - " Attempting query without time travel. Warning: Without" - " time travel, modifications to the underlying table may" - " result in errors or unexpected behavior." - ) - warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) - return False + # If system time isn't supported, it returns NotFound error? + # Note that a notfound caused by a simple typo will be + # caught above when the metadata is fetched, not here. + if should_warn: + msg = bfe.format_message( + "NotFound error when reading table with time travel." + " Attempting query without time travel. Warning: Without" + " time travel, modifications to the underlying table may" + " result in errors or unexpected behavior." + ) + warnings.warn( + msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel + ) + + # If we make it to here, we know for sure that time travel won't work. + return False + else: + # We haven't validated it, but there's a chance that time travel could work. + return True def infer_unique_columns( diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index c264abd860..6500701324 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -744,18 +744,15 @@ def read_gbq_table( else (*columns, *[col for col in index_cols if col not in columns]) ) - try: - enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table( - self._bqclient, - table, - all_columns, - time_travel_timestamp, - filter_str, - ) - except google.api_core.exceptions.Forbidden as ex: - if "Drive credentials" in ex.message: - ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." - raise + enable_snapshot = enable_snapshot and bf_read_gbq_table.is_time_travel_eligible( + self._bqclient, + table, + all_columns, + time_travel_timestamp, + filter_str, + should_warn=True, + should_dry_run=True, + ) # ---------------------------- # Create ordering and validate diff --git a/tests/unit/session/test_session.py b/tests/unit/session/test_session.py index 26b74a3f8a..63c82eb30f 100644 --- a/tests/unit/session/test_session.py +++ b/tests/unit/session/test_session.py @@ -252,12 +252,46 @@ def test_read_gbq_cached_table(): ) session.bqclient.get_table.return_value = table - with pytest.warns(UserWarning, match=re.escape("use_cache=False")): + with pytest.warns( + bigframes.exceptions.TimeTravelCacheWarning, match=re.escape("use_cache=False") + ): df = session.read_gbq("my-project.my_dataset.my_table") assert "1999-01-02T03:04:05.678901" in df.sql +def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_include_time_travel(): + session = mocks.create_bigquery_session() + table_ref = google.cloud.bigquery.TableReference( + google.cloud.bigquery.DatasetReference("my-project", "_anonymous_dataset"), + "my_table", + ) + table = google.cloud.bigquery.Table( + table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),) + ) + table._properties["location"] = session._location + table._properties["numRows"] = "1000000000" + table._properties["location"] = session._location + table._properties["type"] = "TABLE" + session._loader._df_snapshot[table_ref] = ( + datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc), + table, + ) + + session.bqclient.query_and_wait = mock.MagicMock( + return_value=({"total_count": 3, "distinct_count": 2},) + ) + session.bqclient.get_table.return_value = table + + with warnings.catch_warnings(): + warnings.simplefilter( + "error", category=bigframes.exceptions.TimeTravelCacheWarning + ) + df = session.read_gbq("my-project._anonymous_dataset.my_table") + + assert "1999-01-02T03:04:05.678901" not in df.sql + + @pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES) def test_default_index_warning_raised_by_read_gbq(table): """Because of the windowing operation to create a default index, row @@ -474,7 +508,7 @@ def get_table_mock(table_ref): google.api_core.exceptions.Forbidden, match="Check https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions.", ): - api(query_or_table) + api(query_or_table).to_pandas() @mock.patch.dict(os.environ, {}, clear=True)