|
33 | 33 |
|
34 | 34 | import apache_beam as beam
|
35 | 35 | from apache_beam.internal.gcp.json_value import to_json_value
|
| 36 | +from apache_beam.io.gcp import resource_identifiers |
36 | 37 | from apache_beam.io.gcp.bigquery import TableRowJsonCoder
|
37 | 38 | from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR
|
38 | 39 | from apache_beam.io.gcp.bigquery_tools import AvroRowWriter
|
|
44 | 45 | from apache_beam.io.gcp.bigquery_tools import parse_table_reference
|
45 | 46 | from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
|
46 | 47 | from apache_beam.io.gcp.internal.clients import bigquery
|
| 48 | +from apache_beam.metrics import monitoring_infos |
| 49 | +from apache_beam.metrics.execution import MetricsEnvironment |
47 | 50 | from apache_beam.options.pipeline_options import PipelineOptions
|
48 | 51 | from apache_beam.options.value_provider import StaticValueProvider
|
49 | 52 |
|
@@ -422,6 +425,62 @@ def test_perform_load_job_with_source_stream(self):
|
422 | 425 | upload = client.jobs.Insert.call_args[1]["upload"]
|
423 | 426 | self.assertEqual(b'some,data', upload.stream.read())
|
424 | 427 |
|
| 428 | + def verify_write_call_metric( |
| 429 | + self, project_id, dataset_id, table_id, status, count): |
| 430 | + """Check if an metric was recorded for the BQ IO write API call.""" |
| 431 | + process_wide_monitoring_infos = list( |
| 432 | + MetricsEnvironment.process_wide_container(). |
| 433 | + to_runner_api_monitoring_infos(None).values()) |
| 434 | + resource = resource_identifiers.BigQueryTable( |
| 435 | + project_id, dataset_id, table_id) |
| 436 | + labels = { |
| 437 | + # TODO(ajamato): Add Ptransform label. |
| 438 | + monitoring_infos.SERVICE_LABEL: 'BigQuery', |
| 439 | + # Refer to any method which writes elements to BigQuery in batches |
| 440 | + # as "BigQueryBatchWrite". I.e. storage API's insertAll, or future |
| 441 | + # APIs introduced. |
| 442 | + monitoring_infos.METHOD_LABEL: 'BigQueryBatchWrite', |
| 443 | + monitoring_infos.RESOURCE_LABEL: resource, |
| 444 | + monitoring_infos.BIGQUERY_PROJECT_ID_LABEL: project_id, |
| 445 | + monitoring_infos.BIGQUERY_DATASET_LABEL: dataset_id, |
| 446 | + monitoring_infos.BIGQUERY_TABLE_LABEL: table_id, |
| 447 | + monitoring_infos.STATUS_LABEL: status, |
| 448 | + } |
| 449 | + expected_mi = monitoring_infos.int64_counter( |
| 450 | + monitoring_infos.API_REQUEST_COUNT_URN, count, labels=labels) |
| 451 | + expected_mi.ClearField("start_time") |
| 452 | + |
| 453 | + found = False |
| 454 | + for actual_mi in process_wide_monitoring_infos: |
| 455 | + actual_mi.ClearField("start_time") |
| 456 | + if expected_mi == actual_mi: |
| 457 | + found = True |
| 458 | + break |
| 459 | + self.assertTrue( |
| 460 | + found, "Did not find write call metric with status: %s" % status) |
| 461 | + |
| 462 | + def test_insert_rows_sets_metric_on_failure(self): |
| 463 | + from google.api_core import exceptions |
| 464 | + MetricsEnvironment.process_wide_container().reset() |
| 465 | + client = mock.Mock() |
| 466 | + client.insert_rows_json = mock.Mock( |
| 467 | + # Fail a few times, then succeed. |
| 468 | + side_effect=[ |
| 469 | + exceptions.DeadlineExceeded("Deadline Exceeded"), |
| 470 | + exceptions.InternalServerError("Internal Error"), |
| 471 | + [], |
| 472 | + ]) |
| 473 | + wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client) |
| 474 | + wrapper.insert_rows("my_project", "my_dataset", "my_table", []) |
| 475 | + |
| 476 | + # Expect two failing calls, then a success (i.e. two retries). |
| 477 | + self.verify_write_call_metric( |
| 478 | + "my_project", "my_dataset", "my_table", "deadline_exceeded", 1) |
| 479 | + self.verify_write_call_metric( |
| 480 | + "my_project", "my_dataset", "my_table", "internal", 1) |
| 481 | + self.verify_write_call_metric( |
| 482 | + "my_project", "my_dataset", "my_table", "ok", 1) |
| 483 | + |
425 | 484 |
|
426 | 485 | @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
|
427 | 486 | class TestBigQueryReader(unittest.TestCase):
|
|
0 commit comments