Skip to content

Commit e02b54b

Browse files
author
Nikita Sapozhnikov
committed
S3ObjectSummary stream improved
1 parent 191c7b2 commit e02b54b

File tree

3 files changed

+96
-10
lines changed

3 files changed

+96
-10
lines changed

micro-s3/src/main/java/com/aol/micro/server/s3/DirectoryCleaner.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.aol.micro.server.s3;
22

3+
import java.io.File;
34
import java.io.IOException;
45
import java.nio.file.FileSystems;
56
import java.nio.file.Files;
@@ -15,18 +16,15 @@
1516
public class DirectoryCleaner {
1617

1718
private final String temporaryDirectory;
18-
private final boolean cleanupOnStart;
1919

2020
@Autowired
21-
public DirectoryCleaner(@Value("${s3.temp.dir:#{null}") String temporaryDirectory,
22-
@Value("${s3.temp.clean.on.start:false") boolean cleanupOnStart) {
21+
public DirectoryCleaner(@Value("${s3.temp.dir:#{null}") String temporaryDirectory) {
2322
this.temporaryDirectory = temporaryDirectory;
24-
this.cleanupOnStart = cleanupOnStart;
2523
}
2624

2725
@PostConstruct
2826
public void clean() throws IOException {
29-
if (cleanupOnStart) {
27+
if (temporaryDirectory != null && new File(temporaryDirectory).exists()) {
3028
Path directory = FileSystems.getDefault().getPath(temporaryDirectory);
3129
Files.walkFileTree(directory, new CleanupFileVisitor(directory));
3230
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.aol.micro.server.s3;
2+
3+
import java.util.Iterator;
4+
5+
import com.amazonaws.services.s3.AmazonS3Client;
6+
import com.amazonaws.services.s3.model.ListObjectsRequest;
7+
import com.amazonaws.services.s3.model.ObjectListing;
8+
import com.amazonaws.services.s3.model.S3ObjectSummary;
9+
10+
public class S3ObjectSummaryIterator implements Iterator<S3ObjectSummary> {
11+
12+
private final AmazonS3Client client;
13+
private ListObjectsRequest req;
14+
private Iterator<S3ObjectSummary> iterator;
15+
private boolean empty = true;
16+
public S3ObjectSummaryIterator(AmazonS3Client client, ListObjectsRequest req) {
17+
this.client = client;
18+
this.req = req;
19+
updateIterator();
20+
}
21+
22+
@Override
23+
public boolean hasNext() {
24+
if(iterator.hasNext()) {
25+
return true;
26+
} else if(!empty){
27+
updateIterator();
28+
return iterator.hasNext();
29+
} else {
30+
return false;
31+
}
32+
}
33+
34+
private void updateIterator() {
35+
if(iterator == null || !iterator.hasNext()) {
36+
ObjectListing listing = client.listObjects(req);
37+
req = req.withMarker(listing.getNextMarker());
38+
empty = !listing.isTruncated();
39+
iterator = listing.getObjectSummaries().iterator();
40+
}
41+
}
42+
43+
@Override
44+
public S3ObjectSummary next() {
45+
updateIterator();
46+
return iterator.next();
47+
}
48+
49+
}

micro-s3/src/main/java/com/aol/micro/server/s3/S3Utils.java

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,50 @@
11
package com.aol.micro.server.s3;
22

3+
import java.io.ByteArrayInputStream;
4+
import java.io.File;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.nio.file.FileSystems;
8+
import java.nio.file.Files;
39
import java.util.ArrayList;
410
import java.util.List;
511
import java.util.function.Function;
12+
import java.util.function.Supplier;
613
import java.util.stream.Stream;
14+
import java.util.stream.StreamSupport;
715

16+
import org.apache.commons.io.FileUtils;
17+
import org.jooq.lambda.tuple.Tuple;
818
import org.springframework.beans.factory.annotation.Autowired;
19+
import org.springframework.beans.factory.annotation.Value;
920
import org.springframework.stereotype.Component;
1021

22+
import com.amazonaws.AmazonClientException;
23+
import com.amazonaws.AmazonServiceException;
1124
import com.amazonaws.services.s3.AmazonS3Client;
1225
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
1326
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
1427
import com.amazonaws.services.s3.model.ListObjectsRequest;
1528
import com.amazonaws.services.s3.model.ObjectListing;
1629
import com.amazonaws.services.s3.model.S3ObjectSummary;
30+
import com.amazonaws.services.s3.transfer.Download;
31+
import com.amazonaws.services.s3.transfer.TransferManager;
1732
import com.aol.cyclops.control.ReactiveSeq;
33+
import com.aol.cyclops.util.ExceptionSoftener;
1834

1935
@Component
2036
public class S3Utils {
2137

2238
private final AmazonS3Client client;
39+
private final TransferManager transferManager;
40+
private final String tmpDirectory;
2341

2442
@Autowired
25-
public S3Utils(AmazonS3Client client) {
43+
public S3Utils(AmazonS3Client client, TransferManager transferManager,
44+
@Value("${s3.tmp.dir:#{null}}") String tmpDirectory) {
2645
this.client = client;
46+
this.transferManager = transferManager;
47+
this.tmpDirectory = tmpDirectory;
2748
}
2849

2950
public List<S3ObjectSummary> getAllSummaries(ListObjectsRequest req) {
@@ -36,14 +57,13 @@ public List<S3ObjectSummary> getAllSummaries(ListObjectsRequest req) {
3657
marker = listing.getNextMarker();
3758
result.addAll(listing.getObjectSummaries());
3859
} while (listing.isTruncated());
60+
3961
return result;
4062
}
4163

42-
/*
43-
* TODO implement smarter mechanism to reduce number of queries
44-
*/
4564
public <T> Stream<T> getSummariesStream(ListObjectsRequest req, Function<S3ObjectSummary, T> processor) {
46-
return getAllSummaries(req).stream().map(processor);
65+
Iterable<S3ObjectSummary> iterable = () -> new S3ObjectSummaryIterator(client, req);
66+
return StreamSupport.stream(iterable.spliterator(), false).map(processor);
4767
}
4868

4969
public void delete(String bucketName, List<KeyVersion> objects) {
@@ -54,4 +74,23 @@ public void delete(String bucketName, List<KeyVersion> objects) {
5474
});
5575
}
5676

77+
public InputStream getInputStream(String bucketName, String key, Supplier<File> tempFileSupplier)
78+
throws AmazonServiceException, AmazonClientException, InterruptedException, IOException {
79+
File file = tempFileSupplier.get();
80+
try {
81+
Download download = transferManager.download(bucketName, key, file);
82+
download.waitForCompletion();
83+
return new ByteArrayInputStream(FileUtils.readFileToByteArray(file));
84+
} finally {
85+
file.delete();
86+
}
87+
}
88+
89+
public InputStream getInputStream(String bucketName, String key)
90+
throws AmazonServiceException, AmazonClientException, InterruptedException, IOException {
91+
Supplier<File> tempFileSupplier = ExceptionSoftener.softenSupplier(() -> Files
92+
.createTempFile(FileSystems.getDefault().getPath(tmpDirectory), "micro-s3", "file").toFile());
93+
return getInputStream(bucketName, key, tempFileSupplier);
94+
}
95+
5796
}

0 commit comments

Comments
 (0)