diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index d27cd48cdd..10a112c779 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -40,6 +40,7 @@ import weakref import bigframes_vendored.constants as constants +import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery # noqa import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import bigframes_vendored.pandas.io.parquet as third_party_pandas_parquet @@ -2051,6 +2052,7 @@ def _start_query_ml_ddl( project=None, timeout=None, query_with_job=True, + job_retry=third_party_gcb_retry.DEFAULT_ML_JOB_RETRY, ) return iterator, query_job diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index fdc240fa69..83f63e8b9a 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -24,8 +24,10 @@ import typing from typing import Dict, Iterable, Literal, Mapping, Optional, overload, Tuple, Union +import bigframes_vendored.google_cloud_bigquery.retry as third_party_gcb_retry import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq import google.api_core.exceptions +import google.api_core.retry import google.cloud.bigquery as bigquery from bigframes.core import log_adapter @@ -245,7 +247,7 @@ def start_query_with_client( location: Optional[str], project: Optional[str], timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[True], ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: ... @@ -260,8 +262,40 @@ def start_query_with_client( location: Optional[str], project: Optional[str], timeout: Optional[float], - metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, + metrics: Optional[bigframes.session.metrics.ExecutionMetrics], + query_with_job: Literal[False], +) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: + ... + + +@overload +def start_query_with_client( + bq_client: bigquery.Client, + sql: str, + *, + job_config: bigquery.QueryJobConfig, + location: Optional[str], + project: Optional[str], + timeout: Optional[float], + metrics: Optional[bigframes.session.metrics.ExecutionMetrics], + query_with_job: Literal[True], + job_retry: google.api_core.retry.Retry, +) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: + ... + + +@overload +def start_query_with_client( + bq_client: bigquery.Client, + sql: str, + *, + job_config: bigquery.QueryJobConfig, + location: Optional[str], + project: Optional[str], + timeout: Optional[float], + metrics: Optional[bigframes.session.metrics.ExecutionMetrics], query_with_job: Literal[False], + job_retry: google.api_core.retry.Retry, ) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: ... @@ -276,6 +310,11 @@ def start_query_with_client( timeout: Optional[float] = None, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, query_with_job: bool = True, + # TODO(tswast): We can stop providing our own default once we use a + # google-cloud-bigquery version with + # https://github.com/googleapis/python-bigquery/pull/2256 merged, likely + # version 3.36.0 or later. + job_retry: google.api_core.retry.Retry = third_party_gcb_retry.DEFAULT_JOB_RETRY, ) -> Tuple[bigquery.table.RowIterator, Optional[bigquery.QueryJob]]: """ Starts query job and waits for results. @@ -292,6 +331,7 @@ def start_query_with_client( location=location, project=project, api_timeout=timeout, + job_retry=job_retry, ) if metrics is not None: metrics.count_job_stats(row_iterator=results_iterator) @@ -303,6 +343,7 @@ def start_query_with_client( location=location, project=project, timeout=timeout, + job_retry=job_retry, ) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: diff --git a/third_party/bigframes_vendored/google_cloud_bigquery/retry.py b/third_party/bigframes_vendored/google_cloud_bigquery/retry.py new file mode 100644 index 0000000000..15ecda4fbc --- /dev/null +++ b/third_party/bigframes_vendored/google_cloud_bigquery/retry.py @@ -0,0 +1,220 @@ +# Original: https://github.com/googleapis/python-bigquery/blob/main/google/cloud/bigquery/retry.py +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core import exceptions, retry +import google.api_core.future.polling +from google.auth import exceptions as auth_exceptions # type: ignore +import requests.exceptions + +_RETRYABLE_REASONS = frozenset( + ["rateLimitExceeded", "backendError", "internalError", "badGateway"] +) + +_UNSTRUCTURED_RETRYABLE_TYPES = ( + ConnectionError, + exceptions.TooManyRequests, + exceptions.InternalServerError, + exceptions.BadGateway, + exceptions.ServiceUnavailable, + requests.exceptions.ChunkedEncodingError, + requests.exceptions.ConnectionError, + requests.exceptions.Timeout, + auth_exceptions.TransportError, +) + +_MINUTE_IN_SECONDS = 60.0 +_HOUR_IN_SECONDS = 60.0 * _MINUTE_IN_SECONDS +_DEFAULT_RETRY_DEADLINE = 10.0 * _MINUTE_IN_SECONDS + +# Ambiguous errors (e.g. internalError, backendError, rateLimitExceeded) retry +# until the full `_DEFAULT_RETRY_DEADLINE`. This is because the +# `jobs.getQueryResults` REST API translates a job failure into an HTTP error. +# +# TODO(https://github.com/googleapis/python-bigquery/issues/1903): Investigate +# if we can fail early for ambiguous errors in `QueryJob.result()`'s call to +# the `jobs.getQueryResult` API. +# +# We need `_DEFAULT_JOB_DEADLINE` to be some multiple of +# `_DEFAULT_RETRY_DEADLINE` to allow for a few retries after the retry +# timeout is reached. +# +# Note: This multiple should actually be a multiple of +# (2 * _DEFAULT_RETRY_DEADLINE). After an ambiguous exception, the first +# call from `job_retry()` refreshes the job state without actually restarting +# the query. The second `job_retry()` actually restarts the query. For a more +# detailed explanation, see the comments where we set `restart_query_job = True` +# in `QueryJob.result()`'s inner `is_job_done()` function. +_DEFAULT_JOB_DEADLINE = 2.0 * (2.0 * _DEFAULT_RETRY_DEADLINE) + + +def _should_retry(exc): + """Predicate for determining when to retry. + + We retry if and only if the 'reason' is 'backendError' + or 'rateLimitExceeded'. + """ + if not hasattr(exc, "errors") or len(exc.errors) == 0: + # Check for unstructured error returns, e.g. from GFE + return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) + + reason = exc.errors[0]["reason"] + return reason in _RETRYABLE_REASONS + + +DEFAULT_RETRY = retry.Retry(predicate=_should_retry, deadline=_DEFAULT_RETRY_DEADLINE) +"""The default retry object. + +Any method with a ``retry`` parameter will be retried automatically, +with reasonable defaults. To disable retry, pass ``retry=None``. +To modify the default retry behavior, call a ``with_XXX`` method +on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds, +pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``. +""" + + +def _should_retry_get_job_conflict(exc): + """Predicate for determining when to retry a jobs.get call after a conflict error. + + Sometimes we get a 404 after a Conflict. In this case, we + have pretty high confidence that by retrying the 404, we'll + (hopefully) eventually recover the job. + https://github.com/googleapis/python-bigquery/issues/2134 + + Note: we may be able to extend this to user-specified predicates + after https://github.com/googleapis/python-api-core/issues/796 + to tweak existing Retry object predicates. + """ + return isinstance(exc, exceptions.NotFound) or _should_retry(exc) + + +# Pick a deadline smaller than our other deadlines since we want to timeout +# before those expire. +_DEFAULT_GET_JOB_CONFLICT_DEADLINE = _DEFAULT_RETRY_DEADLINE / 3.0 +_DEFAULT_GET_JOB_CONFLICT_RETRY = retry.Retry( + predicate=_should_retry_get_job_conflict, + deadline=_DEFAULT_GET_JOB_CONFLICT_DEADLINE, +) +"""Private, may be removed in future.""" + + +# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We +# briefly had a default timeout, but even setting it at more than twice the +# theoretical server-side default timeout of 2 minutes was not enough for +# complex queries. See: +# https://github.com/googleapis/python-bigquery/issues/970#issuecomment-921934647 +DEFAULT_TIMEOUT = None +"""The default API timeout. + +This is the time to wait per request. To adjust the total wait time, set a +deadline on the retry object. +""" + +job_retry_reasons = ( + "rateLimitExceeded", + "backendError", + "internalError", + "jobBackendError", + "jobInternalError", + "jobRateLimitExceeded", +) + + +def _job_should_retry(exc): + # Sometimes we have ambiguous errors, such as 'backendError' which could + # be due to an API problem or a job problem. For these, make sure we retry + # our is_job_done() function. + # + # Note: This won't restart the job unless we know for sure it's because of + # the job status and set restart_query_job = True in that loop. This means + # that we might end up calling this predicate twice for the same job + # but from different paths: (1) from jobs.getQueryResults RetryError and + # (2) from translating the job error from the body of a jobs.get response. + # + # Note: If we start retrying job types other than queries where we don't + # call the problematic getQueryResults API to check the status, we need + # to provide a different predicate, as there shouldn't be ambiguous + # errors in those cases. + if isinstance(exc, exceptions.RetryError): + exc = exc.cause + + # Per https://github.com/googleapis/python-bigquery/issues/1929, sometimes + # retriable errors make their way here. Because of the separate + # `restart_query_job` logic to make sure we aren't restarting non-failed + # jobs, it should be safe to continue and not totally fail our attempt at + # waiting for the query to complete. + if _should_retry(exc): + return True + + if not hasattr(exc, "errors") or len(exc.errors) == 0: + return False + + reason = exc.errors[0]["reason"] + return reason in job_retry_reasons + + +DEFAULT_JOB_RETRY = retry.Retry( + predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE +) +""" +The default job retry object. +""" + + +DEFAULT_ML_JOB_RETRY = retry.Retry( + predicate=_job_should_retry, deadline=_HOUR_IN_SECONDS +) +""" +The default job retry object for AI/ML jobs. + +Such jobs can take a long time to fail. See: b/436586523. +""" + + +def _query_job_insert_should_retry(exc): + # Per https://github.com/googleapis/python-bigquery/issues/2134, sometimes + # we get a 404 error. In this case, if we get this far, assume that the job + # doesn't actually exist and try again. We can't add 404 to the default + # job_retry because that happens for errors like "this table does not + # exist", which probably won't resolve with a retry. + if isinstance(exc, exceptions.RetryError): + exc = exc.cause + + if isinstance(exc, exceptions.NotFound): + message = exc.message + # Don't try to retry table/dataset not found, just job not found. + # The URL contains jobs, so use whitespace to disambiguate. + return message is not None and " job" in message.lower() + + return _job_should_retry(exc) + + +_DEFAULT_QUERY_JOB_INSERT_RETRY = retry.Retry( + predicate=_query_job_insert_should_retry, + # jobs.insert doesn't wait for the job to complete, so we don't need the + # long _DEFAULT_JOB_DEADLINE for this part. + deadline=_DEFAULT_RETRY_DEADLINE, +) +"""Private, may be removed in future.""" + + +DEFAULT_GET_JOB_TIMEOUT = 128 +""" +Default timeout for Client.get_job(). +""" + +POLLING_DEFAULT_VALUE = google.api_core.future.polling.PollingFuture._DEFAULT_VALUE +""" +Default value defined in google.api_core.future.polling.PollingFuture. +"""