Skip to content

Commit 72dbbf2

Browse files
author
Alex Amato
committed
[BEAM-12670] Fix regression, properly report API call metric properly on error for BQ streaming_inserts
1 parent 205fbb1 commit 72dbbf2

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

sdks/python/apache_beam/io/gcp/bigquery_tools.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,8 @@ def _insert_all_rows(
617617
618618
Docs for this BQ call: https://cloud.google.com/bigquery/docs/reference\
619619
/rest/v2/tabledata/insertAll."""
620+
from google.api_core.exceptions import ClientError
621+
from google.api_core.exceptions import GoogleAPICallError
620622
# The rows argument is a list of
621623
# bigquery.TableDataInsertAllRequest.RowsValueListEntry instances as
622624
# required by the InsertAll() method.
@@ -652,9 +654,13 @@ def _insert_all_rows(
652654
else:
653655
for insert_error in errors:
654656
service_call_metric.call(insert_error['errors'][0])
657+
except (ClientError, GoogleAPICallError) as e:
658+
# e.code.value contains the numeric http status code.
659+
service_call_metric.call(e.code.value)
660+
# Re-reise the exception so that we re-try appropriately.
661+
raise
655662
except HttpError as e:
656663
service_call_metric.call(e)
657-
658664
# Re-reise the exception so that we re-try appropriately.
659665
raise
660666
finally:

sdks/python/apache_beam/io/gcp/bigquery_tools_test.py

+59
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import apache_beam as beam
3535
from apache_beam.internal.gcp.json_value import to_json_value
36+
from apache_beam.io.gcp import resource_identifiers
3637
from apache_beam.io.gcp.bigquery import TableRowJsonCoder
3738
from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
3839
from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
@@ -44,6 +45,8 @@
4445
from apache_beam.io.gcp.bigquery_tools import parse_table_reference
4546
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
4647
from apache_beam.io.gcp.internal.clients import bigquery
48+
from apache_beam.metrics import monitoring_infos
49+
from apache_beam.metrics.execution import MetricsEnvironment
4750
from apache_beam.options.pipeline_options import PipelineOptions
4851
from apache_beam.options.value_provider import StaticValueProvider
4952

@@ -422,6 +425,62 @@ def test_perform_load_job_with_source_stream(self):
422425
upload = client.jobs.Insert.call_args[1]["upload"]
423426
self.assertEqual(b'some,data', upload.stream.read())
424427

428+
def verify_write_call_metric(
429+
self, project_id, dataset_id, table_id, status, count):
430+
"""Check if an metric was recorded for the BQ IO write API call."""
431+
process_wide_monitoring_infos = list(
432+
MetricsEnvironment.process_wide_container().
433+
to_runner_api_monitoring_infos(None).values())
434+
resource = resource_identifiers.BigQueryTable(
435+
project_id, dataset_id, table_id)
436+
labels = {
437+
# TODO(ajamato): Add Ptransform label.
438+
monitoring_infos.SERVICE_LABEL: 'BigQuery',
439+
# Refer to any method which writes elements to BigQuery in batches
440+
# as "BigQueryBatchWrite". I.e. storage API's insertAll, or future
441+
# APIs introduced.
442+
monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite',
443+
monitoring_infos.RESOURCE_LABEL: resource,
444+
monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id,
445+
monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id,
446+
monitoring_infos.BIGQUERY_TABLE_LABEL: table_id,
447+
monitoring_infos.STATUS_LABEL: status,
448+
}
449+
expected_mi = monitoring_infos.int64_counter(
450+
monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels)
451+
expected_mi.ClearField("start_time")
452+
453+
found = False
454+
for actual_mi in process_wide_monitoring_infos:
455+
actual_mi.ClearField("start_time")
456+
if expected_mi == actual_mi:
457+
found = True
458+
break
459+
self.assertTrue(
460+
found, "Did not find write call metric with status: %s" % status)
461+
462+
def test_insert_rows_sets_metric_on_failure(self):
463+
from google.api_core import exceptions
464+
MetricsEnvironment.process_wide_container().reset()
465+
client = mock.Mock()
466+
client.insert_rows_json = mock.Mock(
467+
# Fail a few times, then succeed.
468+
side_effect=[
469+
exceptions.DeadlineExceeded("Deadline Exceeded"),
470+
exceptions.InternalServerError("Internal Error"),
471+
[],
472+
])
473+
wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)
474+
wrapper.insert_rows("my_project", "my_dataset", "my_table", [])
475+
476+
# Expect two failing calls, then a success (i.e. two retries).
477+
self.verify_write_call_metric(
478+
"my_project", "my_dataset", "my_table", "deadline_exceeded", 1)
479+
self.verify_write_call_metric(
480+
"my_project", "my_dataset", "my_table", "internal", 1)
481+
self.verify_write_call_metric(
482+
"my_project", "my_dataset", "my_table", "ok", 1)
483+
425484

426485
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
427486
class TestBigQueryReader(unittest.TestCase):

0 commit comments

Comments
 (0)