Skip to content

Commit 55fb420

Browse files
authored
Merge pull request GoogleCloudPlatform#1061 from GoogleCloudPlatform/tswast-bq
Use futures API to wait for jobs to finish.
2 parents 2a93787 + 473fd9d commit 55fb420

13 files changed

+160
-266
lines changed

bigquery/cloud-client/async_query.py

-64
This file was deleted.

bigquery/cloud-client/export_data_to_gcs.py

+1-13
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
"""
2727

2828
import argparse
29-
import time
3029
import uuid
3130

3231
from google.cloud import bigquery
@@ -42,23 +41,12 @@ def export_data_to_gcs(dataset_name, table_name, destination):
4241
job_name, table, destination)
4342

4443
job.begin()
45-
46-
wait_for_job(job)
44+
job.result() # Wait for job to complete
4745

4846
print('Exported {}:{} to {}'.format(
4947
dataset_name, table_name, destination))
5048

5149

52-
def wait_for_job(job):
53-
while True:
54-
job.reload()
55-
if job.state == 'DONE':
56-
if job.error_result:
57-
raise RuntimeError(job.errors)
58-
return
59-
time.sleep(1)
60-
61-
6250
if __name__ == '__main__':
6351
parser = argparse.ArgumentParser(
6452
description=__doc__,

bigquery/cloud-client/load_data_from_file.py

+1-12
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
"""
2727

2828
import argparse
29-
import time
3029

3130
from google.cloud import bigquery
3231

@@ -45,22 +44,12 @@ def load_data_from_file(dataset_name, table_name, source_file_name):
4544
job = table.upload_from_file(
4645
source_file, source_format='text/csv')
4746

48-
wait_for_job(job)
47+
job.result() # Wait for job to complete
4948

5049
print('Loaded {} rows into {}:{}.'.format(
5150
job.output_rows, dataset_name, table_name))
5251

5352

54-
def wait_for_job(job):
55-
while True:
56-
job.reload()
57-
if job.state == 'DONE':
58-
if job.error_result:
59-
raise RuntimeError(job.errors)
60-
return
61-
time.sleep(1)
62-
63-
6453
if __name__ == '__main__':
6554
parser = argparse.ArgumentParser(
6655
description=__doc__,

bigquery/cloud-client/load_data_from_gcs.py

+1-13
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
"""
2727

2828
import argparse
29-
import time
3029
import uuid
3130

3231
from google.cloud import bigquery
@@ -42,23 +41,12 @@ def load_data_from_gcs(dataset_name, table_name, source):
4241
job_name, table, source)
4342

4443
job.begin()
45-
46-
wait_for_job(job)
44+
job.result() # Wait for job to complete
4745

4846
print('Loaded {} rows into {}:{}.'.format(
4947
job.output_rows, dataset_name, table_name))
5048

5149

52-
def wait_for_job(job):
53-
while True:
54-
job.reload()
55-
if job.state == 'DONE':
56-
if job.error_result:
57-
raise RuntimeError(job.errors)
58-
return
59-
time.sleep(1)
60-
61-
6250
if __name__ == '__main__':
6351
parser = argparse.ArgumentParser(
6452
description=__doc__,

bigquery/cloud-client/query.py

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2016 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Command-line application to perform queries in BigQuery.
18+
19+
For more information, see the README.rst.
20+
21+
Example invocation:
22+
$ python query.py '#standardSQL
23+
SELECT corpus
24+
FROM `publicdata.samples.shakespeare`
25+
GROUP BY corpus
26+
ORDER BY corpus'
27+
"""
28+
29+
import argparse
30+
import uuid
31+
32+
from google.cloud import bigquery
33+
34+
35+
def query(query):
36+
client = bigquery.Client()
37+
query_job = client.run_async_query(str(uuid.uuid4()), query)
38+
39+
query_job.begin()
40+
query_job.result() # Wait for job to complete.
41+
42+
# Print the results.
43+
destination_table = query_job.destination
44+
destination_table.reload()
45+
for row in destination_table.fetch_data():
46+
print(row)
47+
48+
49+
def query_standard_sql(query):
50+
client = bigquery.Client()
51+
query_job = client.run_async_query(str(uuid.uuid4()), query)
52+
# Set use_legacy_sql to False to use standard SQL syntax. See:
53+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql
54+
query_job.use_legacy_sql = False
55+
56+
query_job.begin()
57+
query_job.result() # Wait for job to complete.
58+
59+
# Print the results.
60+
destination_table = query_job.destination
61+
destination_table.reload()
62+
for row in destination_table.fetch_data():
63+
print(row)
64+
65+
66+
if __name__ == '__main__':
67+
parser = argparse.ArgumentParser(
68+
description=__doc__,
69+
formatter_class=argparse.RawDescriptionHelpFormatter)
70+
parser.add_argument('query', help='BigQuery SQL Query.')
71+
parser.add_argument(
72+
'--use_standard_sql',
73+
action='store_true',
74+
help='Use standard SQL syntax.')
75+
76+
args = parser.parse_args()
77+
78+
if args.use_standard_sql:
79+
query_standard_sql(args.query)
80+
else:
81+
query(args.query)

bigquery/cloud-client/query_params.py

+35-33
Original file line numberDiff line numberDiff line change
@@ -25,30 +25,12 @@
2525

2626
import argparse
2727
import datetime
28-
import time
2928
import uuid
3029

3130
from google.cloud import bigquery
3231
import pytz
3332

3433

35-
def wait_for_job(job):
36-
while True:
37-
job.reload() # Refreshes the state via a GET request.
38-
if job.state == 'DONE':
39-
if job.error_result:
40-
raise RuntimeError(job.errors)
41-
return
42-
time.sleep(1)
43-
44-
45-
def print_results(query_results):
46-
"""Print the rows in the query's results."""
47-
rows = query_results.fetch_data(max_results=10)
48-
for row in rows:
49-
print(row)
50-
51-
5234
def query_positional_params(corpus, min_word_count):
5335
client = bigquery.Client()
5436
query = """
@@ -73,10 +55,14 @@ def query_positional_params(corpus, min_word_count):
7355
# See: https://cloud.google.com/bigquery/sql-reference/
7456
query_job.use_legacy_sql = False
7557

76-
# Start the query and wait for the job to complete.
7758
query_job.begin()
78-
wait_for_job(query_job)
79-
print_results(query_job.results())
59+
query_job.result() # Wait for job to complete
60+
61+
# Print the results.
62+
destination_table = query_job.destination
63+
destination_table.reload()
64+
for row in destination_table.fetch_data():
65+
print(row)
8066

8167

8268
def query_named_params(corpus, min_word_count):
@@ -97,10 +83,14 @@ def query_named_params(corpus, min_word_count):
9783
'min_word_count', 'INT64', min_word_count)))
9884
query_job.use_legacy_sql = False
9985

100-
# Start the query and wait for the job to complete.
10186
query_job.begin()
102-
wait_for_job(query_job)
103-
print_results(query_job.results())
87+
query_job.result() # Wait for job to complete
88+
89+
# Print the results.
90+
destination_table = query_job.destination
91+
destination_table.reload()
92+
for row in destination_table.fetch_data():
93+
print(row)
10494

10595

10696
def query_array_params(gender, states):
@@ -122,10 +112,14 @@ def query_array_params(gender, states):
122112
bigquery.ArrayQueryParameter('states', 'STRING', states)))
123113
query_job.use_legacy_sql = False
124114

125-
# Start the query and wait for the job to complete.
126115
query_job.begin()
127-
wait_for_job(query_job)
128-
print_results(query_job.results())
116+
query_job.result() # Wait for job to complete
117+
118+
# Print the results.
119+
destination_table = query_job.destination
120+
destination_table.reload()
121+
for row in destination_table.fetch_data():
122+
print(row)
129123

130124

131125
def query_timestamp_params(year, month, day, hour, minute):
@@ -142,10 +136,14 @@ def query_timestamp_params(year, month, day, hour, minute):
142136
year, month, day, hour, minute, tzinfo=pytz.UTC))])
143137
query_job.use_legacy_sql = False
144138

145-
# Start the query and wait for the job to complete.
146139
query_job.begin()
147-
wait_for_job(query_job)
148-
print_results(query_job.results())
140+
query_job.result() # Wait for job to complete
141+
142+
# Print the results.
143+
destination_table = query_job.destination
144+
destination_table.reload()
145+
for row in destination_table.fetch_data():
146+
print(row)
149147

150148

151149
def query_struct_params(x, y):
@@ -161,10 +159,14 @@ def query_struct_params(x, y):
161159
bigquery.ScalarQueryParameter('y', 'STRING', y))])
162160
query_job.use_legacy_sql = False
163161

164-
# Start the query and wait for the job to complete.
165162
query_job.begin()
166-
wait_for_job(query_job)
167-
print_results(query_job.results())
163+
query_job.result() # Wait for job to complete
164+
165+
# Print the results.
166+
destination_table = query_job.destination
167+
destination_table.reload()
168+
for row in destination_table.fetch_data():
169+
print(row)
168170

169171

170172
if __name__ == '__main__':

0 commit comments

Comments
 (0)