Skip to content

Commit 9673d8b

Browse files
authored
Merge pull request apache#15117 from ajamato/bq_java_read_metrics
[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads
2 parents ccf55e7 + 86db595 commit 9673d8b

11 files changed

+531
-60
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,23 @@ interface BigQueryServerStream<T> extends Iterable<T>, Serializable {
231231

232232
/** An interface representing a client object for making calls to the BigQuery Storage API. */
233233
interface StorageClient extends AutoCloseable {
234-
/** Create a new read session against an existing table. */
234+
/**
235+
* Create a new read session against an existing table. This method variant collects request
236+
* count metric, table id in the request.
237+
*/
235238
ReadSession createReadSession(CreateReadSessionRequest request);
236239

237240
/** Read rows in the context of a specific read stream. */
238241
BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
239242

243+
/* This method variant collects request count metric, using the fullTableID metadata. */
244+
BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request, String fullTableId);
245+
240246
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request);
241247

248+
/* This method variant collects request count metric, using the fullTableID metadata. */
249+
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId);
250+
242251
/**
243252
* Close the client object.
244253
*

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.api.client.util.Sleeper;
3030
import com.google.api.core.ApiFuture;
3131
import com.google.api.gax.core.FixedCredentialsProvider;
32+
import com.google.api.gax.rpc.ApiException;
3233
import com.google.api.gax.rpc.FixedHeaderProvider;
3334
import com.google.api.gax.rpc.HeaderProvider;
3435
import com.google.api.gax.rpc.ServerStream;
@@ -100,7 +101,6 @@
100101
import java.util.concurrent.TimeoutException;
101102
import java.util.concurrent.atomic.AtomicLong;
102103
import java.util.stream.Collectors;
103-
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
104104
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
105105
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
106106
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -843,22 +843,7 @@ <T> long insertAll(
843843
idsToPublish = insertIdList;
844844
}
845845

846-
HashMap<String, String> baseLabels = new HashMap<String, String>();
847-
// TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
848-
// SpecMonitoringInfoValidator from dropping the MonitoringInfo.
849-
baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
850-
baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
851-
baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
852-
baseLabels.put(
853-
MonitoringInfoConstants.Labels.RESOURCE,
854-
GcpResourceIdentifiers.bigQueryTable(
855-
ref.getProjectId(), ref.getDatasetId(), ref.getTableId()));
856-
baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, ref.getProjectId());
857-
baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, ref.getDatasetId());
858-
baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, ref.getTableId());
859-
860-
ServiceCallMetric serviceCallMetric =
861-
new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
846+
ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
862847

863848
while (true) {
864849
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
@@ -1363,21 +1348,80 @@ private StorageClientImpl(BigQueryOptions options) throws IOException {
13631348
this.client = BigQueryReadClient.create(settingsBuilder.build());
13641349
}
13651350

1351+
// Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
1352+
// testing
1353+
// So this wrapper method can be mocked in tests, instead.
1354+
ReadSession callCreateReadSession(CreateReadSessionRequest request) {
1355+
return client.createReadSession(request);
1356+
}
1357+
13661358
@Override
13671359
public ReadSession createReadSession(CreateReadSessionRequest request) {
1368-
return client.createReadSession(request);
1360+
TableReference tableReference =
1361+
BigQueryUtils.toTableReference(request.getReadSession().getTable());
1362+
ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
1363+
try {
1364+
ReadSession session = callCreateReadSession(request);
1365+
if (serviceCallMetric != null) {
1366+
serviceCallMetric.call("ok");
1367+
}
1368+
return session;
1369+
1370+
} catch (ApiException e) {
1371+
if (serviceCallMetric != null) {
1372+
serviceCallMetric.call(e.getStatusCode().getCode().name());
1373+
}
1374+
throw e;
1375+
}
13691376
}
13701377

13711378
@Override
13721379
public BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request) {
13731380
return new BigQueryServerStreamImpl<>(client.readRowsCallable().call(request));
13741381
}
13751382

1383+
@Override
1384+
public BigQueryServerStream<ReadRowsResponse> readRows(
1385+
ReadRowsRequest request, String fullTableId) {
1386+
TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
1387+
ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
1388+
try {
1389+
BigQueryServerStream<ReadRowsResponse> response = readRows(request);
1390+
serviceCallMetric.call("ok");
1391+
return response;
1392+
} catch (ApiException e) {
1393+
if (serviceCallMetric != null) {
1394+
serviceCallMetric.call(e.getStatusCode().getCode().name());
1395+
}
1396+
throw e;
1397+
}
1398+
}
1399+
13761400
@Override
13771401
public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) {
13781402
return client.splitReadStream(request);
13791403
}
13801404

1405+
@Override
1406+
public SplitReadStreamResponse splitReadStream(
1407+
SplitReadStreamRequest request, String fullTableId) {
1408+
TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
1409+
ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
1410+
try {
1411+
SplitReadStreamResponse response = splitReadStream(request);
1412+
1413+
if (serviceCallMetric != null) {
1414+
serviceCallMetric.call("ok");
1415+
}
1416+
return response;
1417+
} catch (ApiException e) {
1418+
if (serviceCallMetric != null) {
1419+
serviceCallMetric.call(e.getStatusCode().getCode().name());
1420+
}
1421+
throw e;
1422+
}
1423+
}
1424+
13811425
@Override
13821426
public void close() {
13831427
client.close();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,9 @@ protected Table getTargetTable(BigQueryOptions options) throws Exception {
175175
kmsKey);
176176
return bqServices.getDatasetService(options).getTable(queryResultTable);
177177
}
178+
179+
@Override
180+
protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception {
181+
return null;
182+
}
178183
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ abstract class BigQueryStorageSourceBase<T> extends BoundedSource<T> {
9494
*/
9595
protected abstract Table getTargetTable(BigQueryOptions options) throws Exception;
9696

97+
protected abstract @Nullable String getTargetTableId(BigQueryOptions options) throws Exception;
98+
9799
@Override
98100
public Coder<T> getOutputCoder() {
99101
return outputCoder;
@@ -105,9 +107,16 @@ public List<BigQueryStorageStreamSource<T>> split(
105107
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
106108
Table targetTable = getTargetTable(bqOptions);
107109

108-
ReadSession.Builder readSessionBuilder =
109-
ReadSession.newBuilder()
110-
.setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
110+
String tableReferenceId = "";
111+
if (targetTable != null) {
112+
tableReferenceId = BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
113+
} else {
114+
// If the table does not exist targetTable will be null.
115+
// Construct the table id if we can generate it. For error recording/logging.
116+
tableReferenceId = getTargetTableId(bqOptions);
117+
}
118+
119+
ReadSession.Builder readSessionBuilder = ReadSession.newBuilder().setTable(tableReferenceId);
111120

112121
if (selectedFieldsProvider != null || rowRestrictionProvider != null) {
113122
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
2222
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
2323

24+
import com.google.api.gax.rpc.ApiException;
2425
import com.google.api.gax.rpc.FailedPreconditionException;
26+
import com.google.api.services.bigquery.model.TableReference;
2527
import com.google.api.services.bigquery.model.TableSchema;
2628
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
2729
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -33,6 +35,7 @@
3335
import java.util.Iterator;
3436
import java.util.List;
3537
import java.util.NoSuchElementException;
38+
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
3639
import org.apache.beam.sdk.coders.Coder;
3740
import org.apache.beam.sdk.io.BoundedSource;
3841
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
@@ -162,6 +165,9 @@ public static class BigQueryStorageStreamReader<T> extends BoundedSource.Bounded
162165
private long rowsConsumedFromCurrentResponse;
163166
private long totalRowsInCurrentResponse;
164167

168+
private TableReference tableReference;
169+
private ServiceCallMetric serviceCallMetric;
170+
165171
private BigQueryStorageStreamReader(
166172
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
167173
this.source = source;
@@ -186,7 +192,9 @@ public synchronized boolean start() throws IOException {
186192
.setOffset(currentOffset)
187193
.build();
188194

189-
responseStream = storageClient.readRows(request);
195+
tableReference = BigQueryUtils.toTableReference(source.readSession.getTable());
196+
serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
197+
responseStream = storageClient.readRows(request, source.readSession.getTable());
190198
responseIterator = responseStream.iterator();
191199
LOG.info("Started BigQuery Storage API read from stream {}.", source.readStream.getName());
192200
return readNextRecord();
@@ -205,7 +213,23 @@ private synchronized boolean readNextRecord() throws IOException {
205213
return false;
206214
}
207215

208-
ReadRowsResponse response = responseIterator.next();
216+
ReadRowsResponse response;
217+
try {
218+
response = responseIterator.next();
219+
// Since we don't have a direct hook to the underlying
220+
// API call, record success ever time we read a record successfully.
221+
if (serviceCallMetric != null) {
222+
serviceCallMetric.call("ok");
223+
}
224+
} catch (ApiException e) {
225+
// Occasionally the iterator will fail and raise an exception.
226+
// Capture it here and record the error in the metric.
227+
if (serviceCallMetric != null) {
228+
serviceCallMetric.call(e.getStatusCode().getCode().name());
229+
}
230+
throw e;
231+
}
232+
209233
progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
210234
progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
211235
totalRowsInCurrentResponse = response.getRowCount();
@@ -315,7 +339,8 @@ public BoundedSource<T> splitAtFraction(double fraction) {
315339
ReadRowsRequest.newBuilder()
316340
.setReadStream(splitResponse.getPrimaryStream().getName())
317341
.setOffset(currentOffset + 1)
318-
.build());
342+
.build(),
343+
source.readSession.getTable());
319344
newResponseIterator = newResponseStream.iterator();
320345
newResponseIterator.hasNext();
321346
} catch (FailedPreconditionException e) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,39 @@ public void populateDisplayData(DisplayData.Builder builder) {
102102

103103
@Override
104104
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
105-
return getTargetTable(options.as(BigQueryOptions.class)).getNumBytes();
105+
Table table = getTargetTable(options.as(BigQueryOptions.class));
106+
if (table != null) {
107+
return table.getNumBytes();
108+
}
109+
// If the table does not exist, then it will be null.
110+
// Avoid the NullPointerException here, allow a more meaningful table "not_found"
111+
// error to be shown to the user, upon table read.
112+
return 0;
113+
}
114+
115+
@Override
116+
protected String getTargetTableId(BigQueryOptions options) throws Exception {
117+
TableReference tableReference = tableReferenceProvider.get();
118+
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
119+
checkState(
120+
!Strings.isNullOrEmpty(options.getProject()),
121+
"No project ID set in %s or %s, cannot construct a complete %s",
122+
TableReference.class.getSimpleName(),
123+
BigQueryOptions.class.getSimpleName(),
124+
TableReference.class.getSimpleName());
125+
LOG.info(
126+
"Project ID not set in {}. Using default project from {}.",
127+
TableReference.class.getSimpleName(),
128+
BigQueryOptions.class.getSimpleName());
129+
tableReference.setProjectId(options.getProject());
130+
}
131+
return String.format(
132+
"projects/%s/datasets/%s/tables/%s",
133+
tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId());
106134
}
107135

108136
@Override
109-
protected Table getTargetTable(BigQueryOptions options) throws Exception {
137+
protected @Nullable Table getTargetTable(BigQueryOptions options) throws Exception {
110138
if (cachedTable.get() == null) {
111139
TableReference tableReference = tableReferenceProvider.get();
112140
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {

0 commit comments

Comments
 (0)