Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bigframes/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class TimeTravelDisabledWarning(Warning):
"""A query was reattempted without time travel."""


class TimeTravelCacheWarning(Warning):
"""Reads from the same table twice in the same session pull time travel from cache."""


class AmbiguousWindowWarning(Warning):
"""A query may produce nondeterministic results as the window may be ambiguously ordered."""

Expand Down
7 changes: 6 additions & 1 deletion bigframes/pandas/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Tuple,
Union,
)
import warnings

import bigframes_vendored.constants as constants
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
Expand Down Expand Up @@ -348,7 +349,11 @@ def _read_gbq_colab(
)
_set_default_session_location_if_possible_deferred_query(create_query)
if not config.options.bigquery._session_started:
config.options.bigquery.enable_polars_execution = True
with warnings.catch_warnings():
# Don't warning about Polars in SQL cell.
# Related to b/437090788.
warnings.simplefilter("ignore", bigframes.exceptions.PreviewWarning)
config.options.bigquery.enable_polars_execution = True

return global_session.with_default_session(
bigframes.session.Session._read_gbq_colab,
Expand Down
192 changes: 123 additions & 69 deletions bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,43 @@ def get_table_metadata(

cached_table = cache.get(table_ref)
if use_cache and cached_table is not None:
snapshot_timestamp, _ = cached_table

# Cache hit could be unexpected. See internal issue 329545805.
# Raise a warning with more information about how to avoid the
# problems with the cache.
msg = bfe.format_message(
f"Reading cached table from {snapshot_timestamp} to avoid "
"incompatibilies with previous reads of this table. To read "
"the latest version, set `use_cache=False` or close the "
"current session with Session.close() or "
"bigframes.pandas.close_session()."
)
# There are many layers before we get to (possibly) the user's code:
# pandas.read_gbq_table
# -> with_default_session
# -> Session.read_gbq_table
# -> _read_gbq_table
# -> _get_snapshot_sql_and_primary_key
# -> get_snapshot_datetime_and_table_metadata
warnings.warn(msg, stacklevel=7)
snapshot_timestamp, table = cached_table

if is_time_travel_eligible(
bqclient=bqclient,
table=table,
columns=None,
snapshot_time=snapshot_timestamp,
filter_str=None,
# Don't warn, because that will already have been taken care of.
should_warn=False,
should_dry_run=False,
):
# This warning should only happen if the cached snapshot_time will
# have any effect on bigframes (b/437090788). For example, with
# cached query results, such as after re-running a query, time
# travel won't be applied and thus this check is irrelevent.
#
# In other cases, such as an explicit read_gbq_table(), Cache hit
# could be unexpected. See internal issue 329545805. Raise a
# warning with more information about how to avoid the problems
# with the cache.
msg = bfe.format_message(
f"Reading cached table from {snapshot_timestamp} to avoid "
"incompatibilies with previous reads of this table. To read "
"the latest version, set `use_cache=False` or close the "
"current session with Session.close() or "
"bigframes.pandas.close_session()."
)
# There are many layers before we get to (possibly) the user's code:
# pandas.read_gbq_table
# -> with_default_session
# -> Session.read_gbq_table
# -> _read_gbq_table
# -> _get_snapshot_sql_and_primary_key
# -> get_snapshot_datetime_and_table_metadata
warnings.warn(msg, category=bfe.TimeTravelCacheWarning, stacklevel=7)

return cached_table

table = bqclient.get_table(table_ref)
Expand All @@ -88,77 +105,114 @@ def get_table_metadata(
return cached_table


def validate_table(
def is_time_travel_eligible(
bqclient: bigquery.Client,
table: bigquery.table.Table,
columns: Optional[Sequence[str]],
snapshot_time: datetime.datetime,
filter_str: Optional[str] = None,
) -> bool:
"""Validates that the table can be read, returns True iff snapshot is supported."""
*,
should_warn: bool,
should_dry_run: bool,
):
"""Check if a table is eligible to use time-travel.


Args:
table: BigQuery table to check.
should_warn:
If true, raises a warning when time travel is disabled and the
underlying table is likely mutable.

Return:
bool:
True if there is a chance that time travel may be supported on this
table. If ``should_dry_run`` is True, then this is validated with a
``dry_run`` query.
"""

# user code
# -> pandas.read_gbq_table
# -> with_default_session
# -> session.read_gbq_table
# -> session._read_gbq_table
# -> loader.read_gbq_table
# -> is_time_travel_eligible
stacklevel = 7

time_travel_not_found = False
# Anonymous dataset, does not support snapshot ever
if table.dataset_id.startswith("_"):
pass
return False

# Only true tables support time travel
elif table.table_id.endswith("*"):
msg = bfe.format_message(
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that "
"modifications to the underlying data may result in errors or "
"unexpected behavior."
)
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
if table.table_id.endswith("*"):
if should_warn:
msg = bfe.format_message(
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that as materialized views "
"are updated periodically, modifications to the underlying data in the view may "
"result in errors or unexpected behavior."
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that "
"modifications to the underlying data may result in errors or "
"unexpected behavior."
)
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
else:
# table might support time travel, lets do a dry-run query with time travel
warnings.warn(
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
)
return False
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
if should_warn:
msg = bfe.format_message(
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that as materialized views "
"are updated periodically, modifications to the underlying data in the view may "
"result in errors or unexpected behavior."
)
warnings.warn(
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
)
return False

# table might support time travel, lets do a dry-run query with time travel
if should_dry_run:
snapshot_sql = bigframes.session._io.bigquery.to_query(
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
columns=columns or (),
sql_predicate=filter_str,
time_travel_timestamp=snapshot_time,
)
try:
# If this succeeds, we don't need to query without time travel, that would surely succeed
bqclient.query_and_wait(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
# If this succeeds, we know that time travel will for sure work.
bigframes.session._io.bigquery.start_query_with_client(
bq_client=bqclient,
sql=snapshot_sql,
job_config=bigquery.QueryJobConfig(dry_run=True),
location=None,
project=None,
timeout=None,
metrics=None,
query_with_job=False,
)
return True

except google.api_core.exceptions.NotFound:
# note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here
time_travel_not_found = True

# At this point, time travel is known to fail, but can we query without time travel?
snapshot_sql = bigframes.session._io.bigquery.to_query(
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
columns=columns or (),
sql_predicate=filter_str,
time_travel_timestamp=None,
)
# Any errors here should just be raised to user
bqclient.query_and_wait(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
)
if time_travel_not_found:
msg = bfe.format_message(
"NotFound error when reading table with time travel."
" Attempting query without time travel. Warning: Without"
" time travel, modifications to the underlying table may"
" result in errors or unexpected behavior."
)
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
return False
# If system time isn't supported, it returns NotFound error?
# Note that a notfound caused by a simple typo will be
# caught above when the metadata is fetched, not here.
if should_warn:
msg = bfe.format_message(
"NotFound error when reading table with time travel."
" Attempting query without time travel. Warning: Without"
" time travel, modifications to the underlying table may"
" result in errors or unexpected behavior."
)
warnings.warn(
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
)

# If we make it to here, we know for sure that time travel won't work.
return False
else:
# We haven't validated it, but there's a chance that time travel could work.
return True


def infer_unique_columns(
Expand Down
21 changes: 9 additions & 12 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,18 +744,15 @@ def read_gbq_table(
else (*columns, *[col for col in index_cols if col not in columns])
)

try:
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
self._bqclient,
table,
all_columns,
time_travel_timestamp,
filter_str,
)
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
raise
enable_snapshot = enable_snapshot and bf_read_gbq_table.is_time_travel_eligible(
self._bqclient,
table,
all_columns,
time_travel_timestamp,
filter_str,
should_warn=True,
should_dry_run=True,
)

# ----------------------------
# Create ordering and validate
Expand Down
38 changes: 36 additions & 2 deletions tests/unit/session/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,46 @@ def test_read_gbq_cached_table():
)
session.bqclient.get_table.return_value = table

with pytest.warns(UserWarning, match=re.escape("use_cache=False")):
with pytest.warns(
bigframes.exceptions.TimeTravelCacheWarning, match=re.escape("use_cache=False")
):
df = session.read_gbq("my-project.my_dataset.my_table")

assert "1999-01-02T03:04:05.678901" in df.sql


def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_include_time_travel():
session = mocks.create_bigquery_session()
table_ref = google.cloud.bigquery.TableReference(
google.cloud.bigquery.DatasetReference("my-project", "_anonymous_dataset"),
"my_table",
)
table = google.cloud.bigquery.Table(
table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),)
)
table._properties["location"] = session._location
table._properties["numRows"] = "1000000000"
table._properties["location"] = session._location
table._properties["type"] = "TABLE"
session._loader._df_snapshot[table_ref] = (
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
table,
)

session.bqclient.query_and_wait = mock.MagicMock(
return_value=({"total_count": 3, "distinct_count": 2},)
)
session.bqclient.get_table.return_value = table

with warnings.catch_warnings():
warnings.simplefilter(
"error", category=bigframes.exceptions.TimeTravelCacheWarning
)
df = session.read_gbq("my-project._anonymous_dataset.my_table")

assert "1999-01-02T03:04:05.678901" not in df.sql


@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES)
def test_default_index_warning_raised_by_read_gbq(table):
"""Because of the windowing operation to create a default index, row
Expand Down Expand Up @@ -474,7 +508,7 @@ def get_table_mock(table_ref):
google.api_core.exceptions.Forbidden,
match="Check https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions.",
):
api(query_or_table)
api(query_or_table).to_pandas()


@mock.patch.dict(os.environ, {}, clear=True)
Expand Down