@@ -292,8 +292,10 @@ class BigQueryWrapper(object):
292
292
(e.g., find and create tables, query a table, etc.).
293
293
"""
294
294
295
- TEMP_TABLE = 'temp_table_'
296
- TEMP_DATASET = 'temp_dataset_'
295
+ # If updating following names, also update the corresponding pydocs in
296
+ # bigquery.py.
297
+ TEMP_TABLE = 'beam_temp_table_'
298
+ TEMP_DATASET = 'beam_temp_dataset_'
297
299
298
300
HISTOGRAM_METRIC_LOGGER = MetricLogger ()
299
301
@@ -313,6 +315,10 @@ def __init__(self, client=None, temp_dataset_id=None):
313
315
'latency_histogram_ms' ,
314
316
LinearBucket (0 , 20 , 3000 ),
315
317
BigQueryWrapper .HISTOGRAM_METRIC_LOGGER )
318
+ if temp_dataset_id and temp_dataset_id .startswith (self .TEMP_DATASET ):
319
+ raise ValueError (
320
+ 'User provided temp dataset ID cannot start with %r' %
321
+ self .TEMP_DATASET )
316
322
self .temp_dataset_id = temp_dataset_id or self ._get_temp_dataset ()
317
323
self .created_temp_dataset = False
318
324
@@ -799,18 +805,22 @@ def get_table_location(self, project_id, dataset_id, table_id):
799
805
table = self .get_table (project_id , dataset_id , table_id )
800
806
return table .location
801
807
808
+ # Returns true if the temporary dataset was provided by the user.
809
+ def is_user_configured_dataset (self ):
810
+ return (
811
+ self .temp_dataset_id and
812
+ not self .temp_dataset_id .startswith (self .TEMP_DATASET ))
813
+
802
814
@retry .with_exponential_backoff (
803
815
num_retries = MAX_RETRIES ,
804
816
retry_filter = retry .retry_on_server_errors_and_timeout_filter )
805
817
def create_temporary_dataset (self , project_id , location ):
806
- is_user_configured_dataset = \
807
- not self .temp_dataset_id .startswith (self .TEMP_DATASET )
808
818
# Check if dataset exists to make sure that the temporary id is unique
809
819
try :
810
820
self .client .datasets .Get (
811
821
bigquery .BigqueryDatasetsGetRequest (
812
822
projectId = project_id , datasetId = self .temp_dataset_id ))
813
- if project_id is not None and not is_user_configured_dataset :
823
+ if project_id is not None and not self . is_user_configured_dataset () :
814
824
# Unittests don't pass projectIds so they can be run without error
815
825
# User configured datasets are allowed to pre-exist.
816
826
raise RuntimeError (
@@ -846,7 +856,13 @@ def clean_up_temporary_dataset(self, project_id):
846
856
else :
847
857
raise
848
858
try :
849
- self ._delete_dataset (temp_table .projectId , temp_table .datasetId , True )
859
+ # We do not want to delete temporary datasets configured by the user hence
860
+ # we just delete the temporary table in that case.
861
+ if not self .is_user_configured_dataset ():
862
+ self ._delete_dataset (temp_table .projectId , temp_table .datasetId , True )
863
+ else :
864
+ self ._delete_table (
865
+ temp_table .projectId , temp_table .datasetId , temp_table .tableId )
850
866
self .created_temp_dataset = False
851
867
except HttpError as exn :
852
868
if exn .status_code == 403 :
@@ -1305,8 +1321,10 @@ def _get_source_location(self):
1305
1321
1306
1322
def __enter__ (self ):
1307
1323
self .client = BigQueryWrapper (client = self .test_bigquery_client )
1308
- self .client .create_temporary_dataset (
1309
- self .executing_project , location = self ._get_source_location ())
1324
+ if not self .client .is_user_configured_dataset ():
1325
+ # Temp dataset was provided by the user so we do not have to create one.
1326
+ self .client .create_temporary_dataset (
1327
+ self .executing_project , location = self ._get_source_location ())
1310
1328
return self
1311
1329
1312
1330
def __exit__ (self , exception_type , exception_value , traceback ):
0 commit comments