Skip to content

Commit efbbf25

Browse files
authored
[BEAM-12778] Prevent unnecessary dry run requests to BQ (apache#15356)
* Prevent unnecessary dry run requests to BQ when temp dataset is provided by the user * Document that the temp dataset name prefix is reserved for Beam * User provided temporary datasets should not be deleted. * Raise an error if reserver dataset prefix is used * updates
1 parent b236713 commit efbbf25

File tree

3 files changed

+46
-34
lines changed

3 files changed

+46
-34
lines changed

sdks/python/apache_beam/io/gcp/bigquery.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -822,6 +822,9 @@ def read(self, range_tracker):
822822

823823
@check_accessible(['query'])
824824
def _setup_temporary_dataset(self, bq):
825+
if self.temp_dataset:
826+
# Temp dataset was provided by the user so we can just return.
827+
return
825828
location = bq.get_query_location(
826829
self._get_project(), self.query.get(), self.use_legacy_sql)
827830
bq.create_temporary_dataset(self._get_project(), location)
@@ -2200,9 +2203,14 @@ class ReadFromBigQuery(PTransform):
22002203
#avro_conversions
22012204
temp_dataset (``apache_beam.io.gcp.internal.clients.bigquery.\
22022205
DatasetReference``):
2203-
The dataset in which to create temporary tables when performing file
2204-
loads. By default, a new dataset is created in the execution project for
2205-
temporary tables.
2206+
Temporary dataset reference to use when reading from BigQuery using a
2207+
query. When reading using a query, BigQuery source will create a
2208+
temporary dataset and a temporary table to store the results of the
2209+
query. With this option, you can set an existing dataset to create the
2210+
temporary table in. BigQuery source will create a temporary table in
2211+
that dataset, and will remove it once it is not needed. Job needs access
2212+
to create and delete tables within the given dataset. Dataset name
2213+
should *not* start with the reserved prefix `beam_temp_dataset_`.
22062214
"""
22072215
class Method(object):
22082216
EXPORT = 'EXPORT' # This is currently the default.

sdks/python/apache_beam/io/gcp/bigquery_test.py

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -451,15 +451,16 @@ def test_get_destination_uri_fallback_temp_location(self):
451451
'empty, using temp_location instead'
452452
])
453453

454+
@mock.patch.object(BigQueryWrapper, '_delete_table')
454455
@mock.patch.object(BigQueryWrapper, '_delete_dataset')
455456
@mock.patch('apache_beam.io.gcp.internal.clients.bigquery.BigqueryV2')
456-
def test_temp_dataset_location_is_configurable(self, api, delete_dataset):
457+
def test_temp_dataset_is_configurable(
458+
self, api, delete_dataset, delete_table):
457459
temp_dataset = bigquery.DatasetReference(
458460
projectId='temp-project', datasetId='bq_dataset')
459461
bq = BigQueryWrapper(client=api, temp_dataset_id=temp_dataset.datasetId)
460462
gcs_location = 'gs://gcs_location'
461463

462-
# bq.get_or_create_dataset.return_value = temp_dataset
463464
c = beam.io.gcp.bigquery._CustomBigQuerySource(
464465
query='select * from test_table',
465466
gcs_location=gcs_location,
@@ -470,30 +471,15 @@ def test_temp_dataset_location_is_configurable(self, api, delete_dataset):
470471
project='execution_project',
471472
**{'temp_dataset': temp_dataset})
472473

473-
api.datasets.Get.side_effect = HttpError({
474-
'status_code': 404, 'status': 404
475-
},
476-
'',
477-
'')
478-
479474
c._setup_temporary_dataset(bq)
480-
api.datasets.Insert.assert_called_with(
481-
bigquery.BigqueryDatasetsInsertRequest(
482-
dataset=bigquery.Dataset(datasetReference=temp_dataset),
483-
projectId=temp_dataset.projectId))
475+
api.datasets.assert_not_called()
484476

485-
api.datasets.Get.return_value = temp_dataset
486-
api.datasets.Get.side_effect = None
477+
# User provided temporary dataset should not be deleted but the temporary
478+
# table created by Beam should be deleted.
487479
bq.clean_up_temporary_dataset(temp_dataset.projectId)
488-
delete_dataset.assert_called_with(
489-
temp_dataset.projectId, temp_dataset.datasetId, True)
490-
491-
self.assertEqual(
492-
bq._get_temp_table(temp_dataset.projectId),
493-
bigquery.TableReference(
494-
projectId=temp_dataset.projectId,
495-
datasetId=temp_dataset.datasetId,
496-
tableId=BigQueryWrapper.TEMP_TABLE + bq._temporary_table_suffix))
480+
delete_dataset.assert_not_called()
481+
delete_table.assert_called_with(
482+
temp_dataset.projectId, temp_dataset.datasetId, mock.ANY)
497483

498484

499485
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')

sdks/python/apache_beam/io/gcp/bigquery_tools.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,10 @@ class BigQueryWrapper(object):
292292
(e.g., find and create tables, query a table, etc.).
293293
"""
294294

295-
TEMP_TABLE = 'temp_table_'
296-
TEMP_DATASET = 'temp_dataset_'
295+
# If updating following names, also update the corresponding pydocs in
296+
# bigquery.py.
297+
TEMP_TABLE = 'beam_temp_table_'
298+
TEMP_DATASET = 'beam_temp_dataset_'
297299

298300
HISTOGRAM_METRIC_LOGGER = MetricLogger()
299301

@@ -313,6 +315,10 @@ def __init__(self, client=None, temp_dataset_id=None):
313315
'latency_histogram_ms',
314316
LinearBucket(0, 20, 3000),
315317
BigQueryWrapper.HISTOGRAM_METRIC_LOGGER)
318+
if temp_dataset_id and temp_dataset_id.startswith(self.TEMP_DATASET):
319+
raise ValueError(
320+
'User provided temp dataset ID cannot start with %r' %
321+
self.TEMP_DATASET)
316322
self.temp_dataset_id = temp_dataset_id or self._get_temp_dataset()
317323
self.created_temp_dataset = False
318324

@@ -799,18 +805,22 @@ def get_table_location(self, project_id, dataset_id, table_id):
799805
table = self.get_table(project_id, dataset_id, table_id)
800806
return table.location
801807

808+
# Returns true if the temporary dataset was provided by the user.
809+
def is_user_configured_dataset(self):
810+
return (
811+
self.temp_dataset_id and
812+
not self.temp_dataset_id.startswith(self.TEMP_DATASET))
813+
802814
@retry.with_exponential_backoff(
803815
num_retries=MAX_RETRIES,
804816
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
805817
def create_temporary_dataset(self, project_id, location):
806-
is_user_configured_dataset = \
807-
not self.temp_dataset_id.startswith(self.TEMP_DATASET)
808818
# Check if dataset exists to make sure that the temporary id is unique
809819
try:
810820
self.client.datasets.Get(
811821
bigquery.BigqueryDatasetsGetRequest(
812822
projectId=project_id, datasetId=self.temp_dataset_id))
813-
if project_id is not None and not is_user_configured_dataset:
823+
if project_id is not None and not self.is_user_configured_dataset():
814824
# Unittests don't pass projectIds so they can be run without error
815825
# User configured datasets are allowed to pre-exist.
816826
raise RuntimeError(
@@ -846,7 +856,13 @@ def clean_up_temporary_dataset(self, project_id):
846856
else:
847857
raise
848858
try:
849-
self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
859+
# We do not want to delete temporary datasets configured by the user hence
860+
# we just delete the temporary table in that case.
861+
if not self.is_user_configured_dataset():
862+
self._delete_dataset(temp_table.projectId, temp_table.datasetId, True)
863+
else:
864+
self._delete_table(
865+
temp_table.projectId, temp_table.datasetId, temp_table.tableId)
850866
self.created_temp_dataset = False
851867
except HttpError as exn:
852868
if exn.status_code == 403:
@@ -1305,8 +1321,10 @@ def _get_source_location(self):
13051321

13061322
def __enter__(self):
13071323
self.client = BigQueryWrapper(client=self.test_bigquery_client)
1308-
self.client.create_temporary_dataset(
1309-
self.executing_project, location=self._get_source_location())
1324+
if not self.client.is_user_configured_dataset():
1325+
# Temp dataset was provided by the user so we do not have to create one.
1326+
self.client.create_temporary_dataset(
1327+
self.executing_project, location=self._get_source_location())
13101328
return self
13111329

13121330
def __exit__(self, exception_type, exception_value, traceback):

0 commit comments

Comments
 (0)