Skip to content

Commit 947cbb7

Browse files
authored
[BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations (apache#15005)
* [BEAM-8376] Google Cloud Firestore Connector - Add Firestore v1 Read Operations Entry point for accessing Firestore V1 read methods is `FirestoreIO.v1().read()`. Currently supported read RPC methods: * `PartitionQuery` * `RunQuery` * `ListCollectionIds` * `ListDocuments` * `BatchGetDocuments` ### Unit Tests No external dependencies are needed for this suite A large suite of unit tests have been added to cover most branches and error scenarios in the various components. Test for input validation and bounds checking are also included in this suite. ### Integration Tests Integration tests for each type of RPC is present in `org.apache.beam.sdk.io.gcp.firestore.it.FirestoreV1IT`. All of these tests leverage `TestPipeline` and verify the expected Documents/Collections are all operated on during the test. * fix failing nullability check for cursor comparator * fix @nullable imports * fix typo * throw exception upon failing to determine restart point for batch get * add unit test for org.apache.beam.sdk.io.gcp.firestore.FirestoreV1.PartitionQuery.PartitionQueryResponseToRunQueryRequest.processElement * javadoc typo fixes from review * Explicitly set Client built in retry to max 1 attempt since we're taking care of all retry logic at a higher level * Clean up names of DoFn base classes to make them more accurate * rename FirestoreV1Fn -> FirestoreV1RpcAttemptContexts * restructure javadocs a big to keep context close to code samples * decouple partition query from run query it can be advantageous to allow a customer to perform some post processing of a query before executing it. By decoupling PartitionQuery from directly outputting to RunQuery this is easily possible. * Add todo to jira issues for query integration improvements * spotless * fix incorrect nullable annotation
1 parent e542d1f commit 947cbb7

21 files changed

+4070
-98
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreDoFn.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,35 @@ abstract class FirestoreDoFn<InT, OutT> extends DoFn<InT, OutT> {
4646
@StartBundle
4747
public abstract void startBundle(DoFn<InT, OutT>.StartBundleContext context) throws Exception;
4848

49-
abstract static class WindowAwareDoFn<InT, OutT> extends FirestoreDoFn<InT, OutT> {
49+
/**
50+
* This class defines a common parent class for those DoFn which rely on the implicit window for
51+
* emitting values while processing a bundle.
52+
*/
53+
abstract static class ImplicitlyWindowedFirestoreDoFn<InT, OutT>
54+
extends FirestoreDoFn<InT, OutT> {
55+
/**
56+
* {@link ProcessContext#element() context.element()} must be non-null, otherwise a
57+
* NullPointerException will be thrown.
58+
*
59+
* @param context Context to source element from, and output to
60+
* @see org.apache.beam.sdk.transforms.DoFn.ProcessElement
61+
*/
62+
@ProcessElement
63+
public abstract void processElement(DoFn<InT, OutT>.ProcessContext context) throws Exception;
64+
65+
/** @see org.apache.beam.sdk.transforms.DoFn.FinishBundle */
66+
@FinishBundle
67+
public abstract void finishBundle() throws Exception;
68+
}
69+
70+
/**
71+
* This class defines a common parent class for those DoFn which must explicitly track the window
72+
* for emitting values while processing bundles. This is primarily necessary to support the
73+
* ability to emit values during {@link #finishBundle(DoFn.FinishBundleContext)} where an output
74+
* value must be explicitly correlated to a window.
75+
*/
76+
abstract static class ExplicitlyWindowedFirestoreDoFn<InT, OutT>
77+
extends FirestoreDoFn<InT, OutT> {
5078
/**
5179
* {@link ProcessContext#element() context.element()} must be non-null, otherwise a
5280
* NullPointerException will be thrown.

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@
1919

2020
import com.google.api.gax.core.FixedCredentialsProvider;
2121
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
22+
import com.google.api.gax.retrying.RetrySettings;
2223
import com.google.api.gax.rpc.ClientContext;
2324
import com.google.api.gax.rpc.FixedHeaderProvider;
2425
import com.google.cloud.firestore.FirestoreOptions.EmulatorCredentials;
2526
import com.google.cloud.firestore.v1.FirestoreSettings;
2627
import com.google.cloud.firestore.v1.stub.FirestoreStub;
2728
import com.google.cloud.firestore.v1.stub.GrpcFirestoreStub;
28-
import java.io.IOException;
2929
import java.io.Serializable;
3030
import java.security.SecureRandom;
3131
import java.util.Map;
@@ -76,6 +76,14 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
7676
}
7777
});
7878

79+
RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(1).build();
80+
81+
builder.applyToAllUnaryMethods(
82+
b -> {
83+
b.setRetrySettings(retrySettings);
84+
return null;
85+
});
86+
7987
FirestoreOptions firestoreOptions = options.as(FirestoreOptions.class);
8088
String emulatorHostPort = firestoreOptions.getEmulatorHost();
8189
if (emulatorHostPort != null) {
@@ -96,7 +104,7 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
96104

97105
ClientContext clientContext = ClientContext.create(builder.build());
98106
return GrpcFirestoreStub.create(clientContext);
99-
} catch (IOException e) {
107+
} catch (Exception e) {
100108
throw new RuntimeException(e);
101109
}
102110
}

0 commit comments

Comments
 (0)