24
24
25
25
import com .amazonaws .AmazonClientException ;
26
26
import com .amazonaws .services .s3 .AmazonS3 ;
27
- import com .amazonaws .services .s3 .AmazonS3ClientBuilder ;
28
27
import com .amazonaws .services .s3 .model .AmazonS3Exception ;
29
28
import com .amazonaws .services .s3 .model .CompleteMultipartUploadRequest ;
30
29
import com .amazonaws .services .s3 .model .CompleteMultipartUploadResult ;
62
61
import java .util .regex .Pattern ;
63
62
import java .util .stream .Collectors ;
64
63
import org .apache .beam .sdk .io .FileSystem ;
65
- import org .apache .beam .sdk .io .aws .options .S3ClientBuilderFactory ;
66
64
import org .apache .beam .sdk .io .aws .options .S3Options ;
67
65
import org .apache .beam .sdk .io .fs .CreateOptions ;
68
66
import org .apache .beam .sdk .io .fs .MatchResult ;
69
- import org .apache .beam .sdk .util .InstanceBuilder ;
70
67
import org .apache .beam .sdk .util .MoreFutures ;
71
68
import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .annotations .VisibleForTesting ;
72
69
import org .apache .beam .vendor .guava .v26_0_jre .com .google .common .base .Strings ;
84
81
import org .slf4j .Logger ;
85
82
import org .slf4j .LoggerFactory ;
86
83
87
- /** {@link FileSystem} implementation for Amazon S3. */
84
+ /**
85
+ * {@link FileSystem} implementation for storage systems that use the S3 protocol.
86
+ *
87
+ * @see S3FileSystemSchemeRegistrar
88
+ */
88
89
@ SuppressWarnings ({
89
90
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
90
91
})
@@ -104,30 +105,29 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
104
105
105
106
// Non-final for testing.
106
107
private Supplier <AmazonS3 > amazonS3 ;
107
- private final S3Options options ;
108
+ private final S3FileSystemConfiguration config ;
108
109
private final ListeningExecutorService executorService ;
109
110
110
- S3FileSystem (S3Options options ) {
111
- this .options = checkNotNull (options , "options" );
112
- AmazonS3ClientBuilder builder =
113
- InstanceBuilder .ofType (S3ClientBuilderFactory .class )
114
- .fromClass (options .getS3ClientFactoryClass ())
115
- .build ()
116
- .createBuilder (options );
111
+ S3FileSystem (S3FileSystemConfiguration config ) {
112
+ this .config = checkNotNull (config , "config" );
117
113
// The Supplier is to make sure we don't call .build() unless we are actually using S3.
118
- amazonS3 = Suppliers .memoize (builder ::build );
114
+ amazonS3 = Suppliers .memoize (config . getS3ClientBuilder () ::build );
119
115
120
- checkNotNull (options .getS3StorageClass (), "storageClass" );
121
- checkArgument (options .getS3ThreadPoolSize () > 0 , "threadPoolSize" );
116
+ checkNotNull (config .getS3StorageClass (), "storageClass" );
117
+ checkArgument (config .getS3ThreadPoolSize () > 0 , "threadPoolSize" );
122
118
executorService =
123
119
MoreExecutors .listeningDecorator (
124
120
Executors .newFixedThreadPool (
125
- options .getS3ThreadPoolSize (), new ThreadFactoryBuilder ().setDaemon (true ).build ()));
121
+ config .getS3ThreadPoolSize (), new ThreadFactoryBuilder ().setDaemon (true ).build ()));
122
+ }
123
+
124
+ S3FileSystem (S3Options options ) {
125
+ this (S3FileSystemConfiguration .fromS3Options (options ).build ());
126
126
}
127
127
128
128
@ Override
129
129
protected String getScheme () {
130
- return S3ResourceId . SCHEME ;
130
+ return config . getScheme () ;
131
131
}
132
132
133
133
@ VisibleForTesting
@@ -327,7 +327,8 @@ private ExpandedGlob expandGlob(S3ResourceId glob) {
327
327
// Filter against regex.
328
328
if (wildcardRegexp .matcher (objectSummary .getKey ()).matches ()) {
329
329
S3ResourceId expandedPath =
330
- S3ResourceId .fromComponents (objectSummary .getBucketName (), objectSummary .getKey ())
330
+ S3ResourceId .fromComponents (
331
+ glob .getScheme (), objectSummary .getBucketName (), objectSummary .getKey ())
331
332
.withSize (objectSummary .getSize ())
332
333
.withLastModified (objectSummary .getLastModified ());
333
334
LOG .debug ("Expanded S3 object path {}" , expandedPath );
@@ -364,7 +365,7 @@ private List<MatchResult> matchNonGlobPaths(Collection<S3ResourceId> paths) thro
364
365
private ObjectMetadata getObjectMetadata (S3ResourceId s3ResourceId ) throws AmazonClientException {
365
366
GetObjectMetadataRequest request =
366
367
new GetObjectMetadataRequest (s3ResourceId .getBucket (), s3ResourceId .getKey ());
367
- request .setSSECustomerKey (options .getSSECustomerKey ());
368
+ request .setSSECustomerKey (config .getSSECustomerKey ());
368
369
return amazonS3 .get ().getObjectMetadata (request );
369
370
}
370
371
@@ -411,12 +412,12 @@ private static MatchResult.Metadata createBeamMetadata(
411
412
@ Override
412
413
protected WritableByteChannel create (S3ResourceId resourceId , CreateOptions createOptions )
413
414
throws IOException {
414
- return new S3WritableByteChannel (amazonS3 .get (), resourceId , createOptions .mimeType (), options );
415
+ return new S3WritableByteChannel (amazonS3 .get (), resourceId , createOptions .mimeType (), config );
415
416
}
416
417
417
418
@ Override
418
419
protected ReadableByteChannel open (S3ResourceId resourceId ) throws IOException {
419
- return new S3ReadableSeekableByteChannel (amazonS3 .get (), resourceId , options );
420
+ return new S3ReadableSeekableByteChannel (amazonS3 .get (), resourceId , config );
420
421
}
421
422
422
423
@ Override
@@ -469,9 +470,9 @@ CopyObjectResult atomicCopy(
469
470
destinationPath .getBucket (),
470
471
destinationPath .getKey ());
471
472
copyObjectRequest .setNewObjectMetadata (sourceObjectMetadata );
472
- copyObjectRequest .setStorageClass (options .getS3StorageClass ());
473
- copyObjectRequest .setSourceSSECustomerKey (options .getSSECustomerKey ());
474
- copyObjectRequest .setDestinationSSECustomerKey (options .getSSECustomerKey ());
473
+ copyObjectRequest .setStorageClass (config .getS3StorageClass ());
474
+ copyObjectRequest .setSourceSSECustomerKey (config .getSSECustomerKey ());
475
+ copyObjectRequest .setDestinationSSECustomerKey (config .getSSECustomerKey ());
475
476
return amazonS3 .get ().copyObject (copyObjectRequest );
476
477
}
477
478
@@ -481,9 +482,9 @@ CompleteMultipartUploadResult multipartCopy(
481
482
throws AmazonClientException {
482
483
InitiateMultipartUploadRequest initiateUploadRequest =
483
484
new InitiateMultipartUploadRequest (destinationPath .getBucket (), destinationPath .getKey ())
484
- .withStorageClass (options .getS3StorageClass ())
485
+ .withStorageClass (config .getS3StorageClass ())
485
486
.withObjectMetadata (sourceObjectMetadata );
486
- initiateUploadRequest .setSSECustomerKey (options .getSSECustomerKey ());
487
+ initiateUploadRequest .setSSECustomerKey (config .getSSECustomerKey ());
487
488
488
489
InitiateMultipartUploadResult initiateUploadResult =
489
490
amazonS3 .get ().initiateMultipartUpload (initiateUploadRequest );
@@ -503,14 +504,14 @@ CompleteMultipartUploadResult multipartCopy(
503
504
.withDestinationKey (destinationPath .getKey ())
504
505
.withUploadId (uploadId )
505
506
.withPartNumber (1 );
506
- copyPartRequest .setSourceSSECustomerKey (options .getSSECustomerKey ());
507
- copyPartRequest .setDestinationSSECustomerKey (options .getSSECustomerKey ());
507
+ copyPartRequest .setSourceSSECustomerKey (config .getSSECustomerKey ());
508
+ copyPartRequest .setDestinationSSECustomerKey (config .getSSECustomerKey ());
508
509
509
510
CopyPartResult copyPartResult = amazonS3 .get ().copyPart (copyPartRequest );
510
511
eTags .add (copyPartResult .getPartETag ());
511
512
} else {
512
513
long bytePosition = 0 ;
513
- Integer uploadBufferSizeBytes = options .getS3UploadBufferSizeBytes ();
514
+ Integer uploadBufferSizeBytes = config .getS3UploadBufferSizeBytes ();
514
515
// Amazon parts are 1-indexed, not zero-indexed.
515
516
for (int partNumber = 1 ; bytePosition < objectSize ; partNumber ++) {
516
517
final CopyPartRequest copyPartRequest =
@@ -523,8 +524,8 @@ CompleteMultipartUploadResult multipartCopy(
523
524
.withPartNumber (partNumber )
524
525
.withFirstByte (bytePosition )
525
526
.withLastByte (Math .min (objectSize - 1 , bytePosition + uploadBufferSizeBytes - 1 ));
526
- copyPartRequest .setSourceSSECustomerKey (options .getSSECustomerKey ());
527
- copyPartRequest .setDestinationSSECustomerKey (options .getSSECustomerKey ());
527
+ copyPartRequest .setSourceSSECustomerKey (config .getSSECustomerKey ());
528
+ copyPartRequest .setDestinationSSECustomerKey (config .getSSECustomerKey ());
528
529
529
530
CopyPartResult copyPartResult = amazonS3 .get ().copyPart (copyPartRequest );
530
531
eTags .add (copyPartResult .getPartETag ());
0 commit comments