Skip to content

Commit 5e26d3b

Browse files
Merge pull request apache#15036: [BEAM-12435] Generalize S3FileSystem to support multiple URI schemes.
2 parents f9b9ccc + 410cad1 commit 5e26d3b

13 files changed

+1068
-148
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575

7676
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
7777
* Added ability to use JdbcIO.Write.withResults without statement and preparedStatementSetter. ([BEAM-12511](https://issues.apache.org/jira/browse/BEAM-12511))
78+
- Added ability to register URI schemes to use the S3 protocol via FileIO. ([BEAM-12435](https://issues.apache.org/jira/browse/BEAM-12435)).
7879

7980
## New Features / Improvements
8081

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.aws.s3;
19+
20+
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
21+
22+
import com.google.auto.service.AutoService;
23+
import javax.annotation.Nonnull;
24+
import org.apache.beam.sdk.annotations.Experimental;
25+
import org.apache.beam.sdk.annotations.Experimental.Kind;
26+
import org.apache.beam.sdk.io.aws.options.S3Options;
27+
import org.apache.beam.sdk.options.PipelineOptions;
28+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
29+
30+
/** Registers the "s3" uri schema to be handled by {@link S3FileSystem}. */
31+
@AutoService(S3FileSystemSchemeRegistrar.class)
32+
@Experimental(Kind.FILESYSTEM)
33+
public class DefaultS3FileSystemSchemeRegistrar implements S3FileSystemSchemeRegistrar {
34+
35+
@Override
36+
public Iterable<S3FileSystemConfiguration> fromOptions(@Nonnull PipelineOptions options) {
37+
checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
38+
return ImmutableList.of(
39+
S3FileSystemConfiguration.fromS3Options(options.as(S3Options.class)).build());
40+
}
41+
}

sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
import com.amazonaws.AmazonClientException;
2626
import com.amazonaws.services.s3.AmazonS3;
27-
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
2827
import com.amazonaws.services.s3.model.AmazonS3Exception;
2928
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
3029
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -62,11 +61,9 @@
6261
import java.util.regex.Pattern;
6362
import java.util.stream.Collectors;
6463
import org.apache.beam.sdk.io.FileSystem;
65-
import org.apache.beam.sdk.io.aws.options.S3ClientBuilderFactory;
6664
import org.apache.beam.sdk.io.aws.options.S3Options;
6765
import org.apache.beam.sdk.io.fs.CreateOptions;
6866
import org.apache.beam.sdk.io.fs.MatchResult;
69-
import org.apache.beam.sdk.util.InstanceBuilder;
7067
import org.apache.beam.sdk.util.MoreFutures;
7168
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
7269
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
@@ -84,7 +81,11 @@
8481
import org.slf4j.Logger;
8582
import org.slf4j.LoggerFactory;
8683

87-
/** {@link FileSystem} implementation for Amazon S3. */
84+
/**
85+
* {@link FileSystem} implementation for storage systems that use the S3 protocol.
86+
*
87+
* @see S3FileSystemSchemeRegistrar
88+
*/
8889
@SuppressWarnings({
8990
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
9091
})
@@ -104,30 +105,29 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
104105

105106
// Non-final for testing.
106107
private Supplier<AmazonS3> amazonS3;
107-
private final S3Options options;
108+
private final S3FileSystemConfiguration config;
108109
private final ListeningExecutorService executorService;
109110

110-
S3FileSystem(S3Options options) {
111-
this.options = checkNotNull(options, "options");
112-
AmazonS3ClientBuilder builder =
113-
InstanceBuilder.ofType(S3ClientBuilderFactory.class)
114-
.fromClass(options.getS3ClientFactoryClass())
115-
.build()
116-
.createBuilder(options);
111+
S3FileSystem(S3FileSystemConfiguration config) {
112+
this.config = checkNotNull(config, "config");
117113
// The Supplier is to make sure we don't call .build() unless we are actually using S3.
118-
amazonS3 = Suppliers.memoize(builder::build);
114+
amazonS3 = Suppliers.memoize(config.getS3ClientBuilder()::build);
119115

120-
checkNotNull(options.getS3StorageClass(), "storageClass");
121-
checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
116+
checkNotNull(config.getS3StorageClass(), "storageClass");
117+
checkArgument(config.getS3ThreadPoolSize() > 0, "threadPoolSize");
122118
executorService =
123119
MoreExecutors.listeningDecorator(
124120
Executors.newFixedThreadPool(
125-
options.getS3ThreadPoolSize(), new ThreadFactoryBuilder().setDaemon(true).build()));
121+
config.getS3ThreadPoolSize(), new ThreadFactoryBuilder().setDaemon(true).build()));
122+
}
123+
124+
S3FileSystem(S3Options options) {
125+
this(S3FileSystemConfiguration.fromS3Options(options).build());
126126
}
127127

128128
@Override
129129
protected String getScheme() {
130-
return S3ResourceId.SCHEME;
130+
return config.getScheme();
131131
}
132132

133133
@VisibleForTesting
@@ -327,7 +327,8 @@ private ExpandedGlob expandGlob(S3ResourceId glob) {
327327
// Filter against regex.
328328
if (wildcardRegexp.matcher(objectSummary.getKey()).matches()) {
329329
S3ResourceId expandedPath =
330-
S3ResourceId.fromComponents(objectSummary.getBucketName(), objectSummary.getKey())
330+
S3ResourceId.fromComponents(
331+
glob.getScheme(), objectSummary.getBucketName(), objectSummary.getKey())
331332
.withSize(objectSummary.getSize())
332333
.withLastModified(objectSummary.getLastModified());
333334
LOG.debug("Expanded S3 object path {}", expandedPath);
@@ -364,7 +365,7 @@ private List<MatchResult> matchNonGlobPaths(Collection<S3ResourceId> paths) thro
364365
private ObjectMetadata getObjectMetadata(S3ResourceId s3ResourceId) throws AmazonClientException {
365366
GetObjectMetadataRequest request =
366367
new GetObjectMetadataRequest(s3ResourceId.getBucket(), s3ResourceId.getKey());
367-
request.setSSECustomerKey(options.getSSECustomerKey());
368+
request.setSSECustomerKey(config.getSSECustomerKey());
368369
return amazonS3.get().getObjectMetadata(request);
369370
}
370371

@@ -411,12 +412,12 @@ private static MatchResult.Metadata createBeamMetadata(
411412
@Override
412413
protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions createOptions)
413414
throws IOException {
414-
return new S3WritableByteChannel(amazonS3.get(), resourceId, createOptions.mimeType(), options);
415+
return new S3WritableByteChannel(amazonS3.get(), resourceId, createOptions.mimeType(), config);
415416
}
416417

417418
@Override
418419
protected ReadableByteChannel open(S3ResourceId resourceId) throws IOException {
419-
return new S3ReadableSeekableByteChannel(amazonS3.get(), resourceId, options);
420+
return new S3ReadableSeekableByteChannel(amazonS3.get(), resourceId, config);
420421
}
421422

422423
@Override
@@ -469,9 +470,9 @@ CopyObjectResult atomicCopy(
469470
destinationPath.getBucket(),
470471
destinationPath.getKey());
471472
copyObjectRequest.setNewObjectMetadata(sourceObjectMetadata);
472-
copyObjectRequest.setStorageClass(options.getS3StorageClass());
473-
copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
474-
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
473+
copyObjectRequest.setStorageClass(config.getS3StorageClass());
474+
copyObjectRequest.setSourceSSECustomerKey(config.getSSECustomerKey());
475+
copyObjectRequest.setDestinationSSECustomerKey(config.getSSECustomerKey());
475476
return amazonS3.get().copyObject(copyObjectRequest);
476477
}
477478

@@ -481,9 +482,9 @@ CompleteMultipartUploadResult multipartCopy(
481482
throws AmazonClientException {
482483
InitiateMultipartUploadRequest initiateUploadRequest =
483484
new InitiateMultipartUploadRequest(destinationPath.getBucket(), destinationPath.getKey())
484-
.withStorageClass(options.getS3StorageClass())
485+
.withStorageClass(config.getS3StorageClass())
485486
.withObjectMetadata(sourceObjectMetadata);
486-
initiateUploadRequest.setSSECustomerKey(options.getSSECustomerKey());
487+
initiateUploadRequest.setSSECustomerKey(config.getSSECustomerKey());
487488

488489
InitiateMultipartUploadResult initiateUploadResult =
489490
amazonS3.get().initiateMultipartUpload(initiateUploadRequest);
@@ -503,14 +504,14 @@ CompleteMultipartUploadResult multipartCopy(
503504
.withDestinationKey(destinationPath.getKey())
504505
.withUploadId(uploadId)
505506
.withPartNumber(1);
506-
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
507-
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
507+
copyPartRequest.setSourceSSECustomerKey(config.getSSECustomerKey());
508+
copyPartRequest.setDestinationSSECustomerKey(config.getSSECustomerKey());
508509

509510
CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
510511
eTags.add(copyPartResult.getPartETag());
511512
} else {
512513
long bytePosition = 0;
513-
Integer uploadBufferSizeBytes = options.getS3UploadBufferSizeBytes();
514+
Integer uploadBufferSizeBytes = config.getS3UploadBufferSizeBytes();
514515
// Amazon parts are 1-indexed, not zero-indexed.
515516
for (int partNumber = 1; bytePosition < objectSize; partNumber++) {
516517
final CopyPartRequest copyPartRequest =
@@ -523,8 +524,8 @@ CompleteMultipartUploadResult multipartCopy(
523524
.withPartNumber(partNumber)
524525
.withFirstByte(bytePosition)
525526
.withLastByte(Math.min(objectSize - 1, bytePosition + uploadBufferSizeBytes - 1));
526-
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
527-
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
527+
copyPartRequest.setSourceSSECustomerKey(config.getSSECustomerKey());
528+
copyPartRequest.setDestinationSSECustomerKey(config.getSSECustomerKey());
528529

529530
CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
530531
eTags.add(copyPartResult.getPartETag());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.aws.s3;
19+
20+
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
21+
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
22+
import com.amazonaws.services.s3.model.SSECustomerKey;
23+
import com.google.auto.value.AutoValue;
24+
import javax.annotation.Nullable;
25+
import org.apache.beam.sdk.annotations.Experimental;
26+
import org.apache.beam.sdk.annotations.Experimental.Kind;
27+
import org.apache.beam.sdk.io.aws.options.S3ClientBuilderFactory;
28+
import org.apache.beam.sdk.io.aws.options.S3Options;
29+
import org.apache.beam.sdk.util.InstanceBuilder;
30+
31+
/**
32+
* Object used to configure {@link S3FileSystem}.
33+
*
34+
* @see S3Options
35+
* @see S3FileSystemSchemeRegistrar
36+
*/
37+
@AutoValue
38+
@Experimental(Kind.FILESYSTEM)
39+
public abstract class S3FileSystemConfiguration {
40+
public static final int MINIMUM_UPLOAD_BUFFER_SIZE_BYTES =
41+
S3Options.S3UploadBufferSizeBytesFactory.MINIMUM_UPLOAD_BUFFER_SIZE_BYTES;
42+
43+
/** The uri scheme used by resources on this filesystem. */
44+
public abstract String getScheme();
45+
46+
/** The AWS S3 storage class used for creating S3 objects. */
47+
public abstract String getS3StorageClass();
48+
49+
/** Size of S3 upload chunks. */
50+
public abstract int getS3UploadBufferSizeBytes();
51+
52+
/** Thread pool size, limiting the max concurrent S3 operations. */
53+
public abstract int getS3ThreadPoolSize();
54+
55+
/** Algorithm for SSE-S3 encryption, e.g. AES256. */
56+
public abstract @Nullable String getSSEAlgorithm();
57+
58+
/** SSE key for SSE-C encryption, e.g. a base64 encoded key and the algorithm. */
59+
public abstract @Nullable SSECustomerKey getSSECustomerKey();
60+
61+
/** KMS key id for SSE-KMS encryption, e.g. "arn:aws:kms:...". */
62+
public abstract @Nullable SSEAwsKeyManagementParams getSSEAwsKeyManagementParams();
63+
64+
/** Builder used to create the {@code AmazonS3Client}. */
65+
public abstract AmazonS3ClientBuilder getS3ClientBuilder();
66+
67+
/** Creates a new uninitialized {@link Builder}. */
68+
public static Builder builder() {
69+
return new AutoValue_S3FileSystemConfiguration.Builder();
70+
}
71+
72+
/** Creates a new {@link Builder} with values initialized by this instance's properties. */
73+
public abstract Builder toBuilder();
74+
75+
/**
76+
* Creates a new {@link Builder} with values initialized by the properties of {@code s3Options}.
77+
*/
78+
public static Builder fromS3Options(S3Options s3Options) {
79+
return builder()
80+
.setScheme("s3")
81+
.setS3StorageClass(s3Options.getS3StorageClass())
82+
.setS3UploadBufferSizeBytes(s3Options.getS3UploadBufferSizeBytes())
83+
.setS3ThreadPoolSize(s3Options.getS3ThreadPoolSize())
84+
.setSSEAlgorithm(s3Options.getSSEAlgorithm())
85+
.setSSECustomerKey(s3Options.getSSECustomerKey())
86+
.setSSEAwsKeyManagementParams(s3Options.getSSEAwsKeyManagementParams())
87+
.setS3ClientBuilder(getBuilder(s3Options));
88+
}
89+
90+
/** Creates a new {@link AmazonS3ClientBuilder} as specified by {@code s3Options}. */
91+
public static AmazonS3ClientBuilder getBuilder(S3Options s3Options) {
92+
return InstanceBuilder.ofType(S3ClientBuilderFactory.class)
93+
.fromClass(s3Options.getS3ClientFactoryClass())
94+
.build()
95+
.createBuilder(s3Options);
96+
}
97+
98+
@AutoValue.Builder
99+
public abstract static class Builder {
100+
public abstract Builder setScheme(String value);
101+
102+
public abstract Builder setS3StorageClass(String value);
103+
104+
public abstract Builder setS3UploadBufferSizeBytes(int value);
105+
106+
public abstract Builder setS3ThreadPoolSize(int value);
107+
108+
public abstract Builder setSSEAlgorithm(@Nullable String value);
109+
110+
public abstract Builder setSSECustomerKey(@Nullable SSECustomerKey value);
111+
112+
public abstract Builder setSSEAwsKeyManagementParams(@Nullable SSEAwsKeyManagementParams value);
113+
114+
public abstract Builder setS3ClientBuilder(AmazonS3ClientBuilder value);
115+
116+
public abstract S3FileSystemConfiguration build();
117+
}
118+
}

sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemRegistrar.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,39 @@
2020
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
2121

2222
import com.google.auto.service.AutoService;
23+
import java.util.Map;
24+
import java.util.ServiceLoader;
25+
import java.util.stream.Collectors;
2326
import javax.annotation.Nonnull;
2427
import org.apache.beam.sdk.annotations.Experimental;
2528
import org.apache.beam.sdk.annotations.Experimental.Kind;
2629
import org.apache.beam.sdk.io.FileSystem;
2730
import org.apache.beam.sdk.io.FileSystemRegistrar;
28-
import org.apache.beam.sdk.io.aws.options.S3Options;
2931
import org.apache.beam.sdk.options.PipelineOptions;
30-
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
32+
import org.apache.beam.sdk.util.common.ReflectHelpers;
33+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
3134

32-
/** {@link AutoService} registrar for the {@link S3FileSystem}. */
35+
/**
36+
* {@link AutoService} registrar for the {@link S3FileSystem}.
37+
*
38+
* <p>Creates instances of {@link S3FileSystem} for each scheme registered with a {@link
39+
* S3FileSystemSchemeRegistrar}.
40+
*/
3341
@AutoService(FileSystemRegistrar.class)
3442
@Experimental(Kind.FILESYSTEM)
3543
public class S3FileSystemRegistrar implements FileSystemRegistrar {
3644

3745
@Override
3846
public Iterable<FileSystem<?>> fromOptions(@Nonnull PipelineOptions options) {
3947
checkNotNull(options, "Expect the runner have called FileSystems.setDefaultPipelineOptions().");
40-
return ImmutableList.of(new S3FileSystem(options.as(S3Options.class)));
48+
Map<String, FileSystem<?>> fileSystems =
49+
Streams.stream(
50+
ServiceLoader.load(
51+
S3FileSystemSchemeRegistrar.class, ReflectHelpers.findClassLoader()))
52+
.flatMap(r -> Streams.stream(r.fromOptions(options)))
53+
.map(S3FileSystem::new)
54+
// Throws IllegalStateException if any duplicate schemes exist.
55+
.collect(Collectors.toMap(S3FileSystem::getScheme, f -> (FileSystem<?>) f));
56+
return fileSystems.values();
4157
}
4258
}

0 commit comments

Comments
 (0)