1
1
package com .aol .micro .server .s3 ;
2
2
3
+ import java .io .ByteArrayInputStream ;
4
+ import java .io .File ;
5
+ import java .io .IOException ;
6
+ import java .io .InputStream ;
7
+ import java .nio .file .FileSystems ;
8
+ import java .nio .file .Files ;
3
9
import java .util .ArrayList ;
4
10
import java .util .List ;
5
11
import java .util .function .Function ;
6
- import java .util .stream . Stream ;
12
+ import java .util .function . Supplier ;
7
13
14
+ import org .apache .commons .io .FileUtils ;
8
15
import org .springframework .beans .factory .annotation .Autowired ;
16
+ import org .springframework .beans .factory .annotation .Value ;
9
17
import org .springframework .stereotype .Component ;
10
18
19
+ import com .amazonaws .AmazonClientException ;
20
+ import com .amazonaws .AmazonServiceException ;
11
21
import com .amazonaws .services .s3 .AmazonS3Client ;
12
22
import com .amazonaws .services .s3 .model .DeleteObjectsRequest ;
13
23
import com .amazonaws .services .s3 .model .DeleteObjectsRequest .KeyVersion ;
14
24
import com .amazonaws .services .s3 .model .ListObjectsRequest ;
15
25
import com .amazonaws .services .s3 .model .ObjectListing ;
16
26
import com .amazonaws .services .s3 .model .S3ObjectSummary ;
27
+ import com .amazonaws .services .s3 .transfer .Download ;
28
+ import com .amazonaws .services .s3 .transfer .TransferManager ;
17
29
import com .aol .cyclops .control .ReactiveSeq ;
30
+ import com .aol .cyclops .util .ExceptionSoftener ;
18
31
19
32
@ Component
20
33
public class S3Utils {
21
34
22
35
private final AmazonS3Client client ;
36
+ private final TransferManager transferManager ;
37
+ private final String tmpDirectory ;
23
38
24
39
@ Autowired
25
- public S3Utils (AmazonS3Client client ) {
40
+ public S3Utils (AmazonS3Client client , TransferManager transferManager ,
41
+ @ Value ("${s3.tmp.dir:#{null}}" ) String tmpDirectory ) {
26
42
this .client = client ;
43
+ this .transferManager = transferManager ;
44
+ this .tmpDirectory = tmpDirectory ;
27
45
}
28
46
47
+
48
+ /**
49
+ * Method returns list of all <b>S3ObjectSummary</b> objects, subject to <i>req</i> parameters.
50
+ * Multiple S3 calls will be performed if there are more than 1000 elements there
51
+ * @param req - ListObjectRequest to be used.
52
+ * @return List of S3ObjectSummary from bucket,
53
+ */
29
54
public List <S3ObjectSummary > getAllSummaries (ListObjectsRequest req ) {
30
55
List <S3ObjectSummary > result = new ArrayList <>();
31
56
String marker = null ;
@@ -36,16 +61,27 @@ public List<S3ObjectSummary> getAllSummaries(ListObjectsRequest req) {
36
61
marker = listing .getNextMarker ();
37
62
result .addAll (listing .getObjectSummaries ());
38
63
} while (listing .isTruncated ());
64
+
39
65
return result ;
40
66
}
41
67
42
- /*
43
- * TODO implement smarter mechanism to reduce number of queries
68
+ /**
69
+ * Method return stream of S3ObjectSummary objects, subject to <i>req</i> parameters
70
+ * Method will perform one query for every 1000 elements (current s3 limitation).
71
+ * It is lazy, so there would be no unnecesarry calls
72
+ * @param req - ListObjectRequest to be used.
73
+ * @param processor - Function that convert S3ObjectSummary to any object
74
+ * @return ReactiveSeq of converted S3Object summary elements.
44
75
*/
45
- public <T > Stream <T > getSummariesStream (ListObjectsRequest req , Function <S3ObjectSummary , T > processor ) {
46
- return getAllSummaries ( req ). stream ( ).map (processor );
76
+ public <T > ReactiveSeq <T > getSummariesStream (ListObjectsRequest req , Function <S3ObjectSummary , T > processor ) {
77
+ return ReactiveSeq . fromIterator ( new S3ObjectSummaryIterator ( client , req )).map (processor );
47
78
}
48
79
80
+ /**
81
+ * Method delete all <i>objects</i> from <i>bucketName</i> in groups by 1000 elements
82
+ * @param bucketName
83
+ * @param objects
84
+ */
49
85
public void delete (String bucketName , List <KeyVersion > objects ) {
50
86
ReactiveSeq .fromList (objects ).grouped (1000 ).forEach (l -> {
51
87
DeleteObjectsRequest req = new DeleteObjectsRequest (bucketName );
@@ -54,4 +90,47 @@ public void delete(String bucketName, List<KeyVersion> objects) {
54
90
});
55
91
}
56
92
93
+ /**
94
+ * Method returns InputStream from S3Object. Multi-part download is used to get file.
95
+ * s3.tmp.dir property used to store temporary files. You can specify temporary file name by
96
+ * using tempFileSupplier object.
97
+ * @param bucketName
98
+ * @param key -
99
+ * @param tempFileSupplier - Supplier providing temporary filenames
100
+ * @return InputStream of
101
+ * @throws AmazonServiceException
102
+ * @throws AmazonClientException
103
+ * @throws InterruptedException
104
+ * @throws IOException
105
+ */
106
+ public InputStream getInputStream (String bucketName , String key , Supplier <File > tempFileSupplier )
107
+ throws AmazonServiceException , AmazonClientException , InterruptedException , IOException {
108
+ File file = tempFileSupplier .get ();
109
+ try {
110
+ Download download = transferManager .download (bucketName , key , file );
111
+ download .waitForCompletion ();
112
+ return new ByteArrayInputStream (FileUtils .readFileToByteArray (file ));
113
+ } finally {
114
+ file .delete ();
115
+ }
116
+ }
117
+
118
+ /**
119
+ * Method returns InputStream from S3Object. Multi-part download is used to get file.
120
+ * s3.tmp.dir property used to store temporary files.
121
+ * @param bucketName
122
+ * @param key
123
+ * @return
124
+ * @throws AmazonServiceException
125
+ * @throws AmazonClientException
126
+ * @throws InterruptedException
127
+ * @throws IOException
128
+ */
129
+ public InputStream getInputStream (String bucketName , String key )
130
+ throws AmazonServiceException , AmazonClientException , InterruptedException , IOException {
131
+ Supplier <File > tempFileSupplier = ExceptionSoftener .softenSupplier (() -> Files
132
+ .createTempFile (FileSystems .getDefault ().getPath (tmpDirectory ), "micro-s3" , "file" ).toFile ());
133
+ return getInputStream (bucketName , key , tempFileSupplier );
134
+ }
135
+
57
136
}
0 commit comments