Skip to content

Commit 30a5047

Browse files
committed
Use multi-part upload to stream output
1 parent 0e22597 commit 30a5047

File tree

3 files changed

+328
-45
lines changed

3 files changed

+328
-45
lines changed

src/main/java/ch/myniva/gradle/caching/s3/internal/AwsS3BuildCacheService.java

+5-20
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,16 @@
1818

1919
import com.amazonaws.services.s3.AmazonS3;
2020
import com.amazonaws.services.s3.model.ObjectMetadata;
21-
import com.amazonaws.services.s3.model.PutObjectRequest;
2221
import com.amazonaws.services.s3.model.S3Object;
23-
import java.io.ByteArrayInputStream;
24-
import java.io.ByteArrayOutputStream;
25-
import java.io.IOException;
26-
import java.io.InputStream;
27-
28-
import com.amazonaws.services.s3.model.StorageClass;
2922
import org.gradle.caching.BuildCacheEntryReader;
3023
import org.gradle.caching.BuildCacheEntryWriter;
3124
import org.gradle.caching.BuildCacheException;
3225
import org.gradle.caching.BuildCacheKey;
3326
import org.gradle.caching.BuildCacheService;
3427
import org.slf4j.Logger;
3528
import org.slf4j.LoggerFactory;
29+
import java.io.IOException;
30+
import java.io.InputStream;
3631

3732
public class AwsS3BuildCacheService implements BuildCacheService {
3833

@@ -83,25 +78,15 @@ public void store(BuildCacheKey key, BuildCacheEntryWriter writer) {
8378
ObjectMetadata meta = new ObjectMetadata();
8479
meta.setContentType(BUILD_CACHE_CONTENT_TYPE);
8580

86-
try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
81+
try (S3OutputStream os = new S3OutputStream(s3, bucketName, bucketPath, reducedRedundancy)) {
8782
writer.writeTo(os);
88-
meta.setContentLength(os.size());
89-
try (InputStream is = new ByteArrayInputStream(os.toByteArray())) {
90-
PutObjectRequest request = getPutObjectRequest(bucketPath, meta, is);
91-
if(this.reducedRedundancy) {
92-
request.withStorageClass(StorageClass.ReducedRedundancy);
93-
}
94-
s3.putObject(request);
95-
}
83+
meta.setContentLength(os.getBytesWritten());
84+
9685
} catch (IOException e) {
9786
throw new BuildCacheException("Error while storing cache object in S3 bucket", e);
9887
}
9988
}
10089

101-
protected PutObjectRequest getPutObjectRequest(String bucketPath, ObjectMetadata meta, InputStream is) {
102-
return new PutObjectRequest(bucketName, bucketPath, is, meta);
103-
}
104-
10590
@Override
10691
public void close() throws IOException {
10792
// The AWS S3 client does not need to be closed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package ch.myniva.gradle.caching.s3.internal;
2+
3+
import com.amazonaws.services.s3.AmazonS3;
4+
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
5+
import com.amazonaws.services.s3.model.CannedAccessControlList;
6+
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
7+
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
8+
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
9+
import com.amazonaws.services.s3.model.ObjectMetadata;
10+
import com.amazonaws.services.s3.model.PartETag;
11+
import com.amazonaws.services.s3.model.PutObjectRequest;
12+
import com.amazonaws.services.s3.model.StorageClass;
13+
import com.amazonaws.services.s3.model.UploadPartRequest;
14+
import com.amazonaws.services.s3.model.UploadPartResult;
15+
import java.io.ByteArrayInputStream;
16+
import java.io.OutputStream;
17+
import java.util.ArrayList;
18+
import java.util.List;
19+
20+
//from https://gist.github.com/blagerweij/ad1dbb7ee2fff8bcffd372815ad310eb
21+
public class S3OutputStream extends OutputStream {
22+
23+
/** Default chunk size is 1MB */
24+
protected static final int BUFFER_SIZE = 1000000;
25+
26+
/** The bucket-name on Amazon S3 */
27+
private final String bucket;
28+
29+
/** The path (key) name within the bucket */
30+
private final String path;
31+
32+
/** The temporary buffer used for storing the chunks */
33+
private final byte[] buf;
34+
35+
/** The position in the buffer */
36+
private int position;
37+
38+
/** Amazon S3 client. TODO: support KMS */
39+
private final AmazonS3 s3Client;
40+
41+
private boolean reducedRedundancy = false;
42+
43+
/** The unique id for this upload */
44+
private String uploadId;
45+
46+
/** Collection of the etags for the parts that have been uploaded */
47+
private final List<PartETag> etags;
48+
49+
/** indicates whether the stream is still open / valid */
50+
private boolean open;
51+
52+
private long bytesWritten = 0L;
53+
54+
/**
55+
* Creates a new S3 OutputStream
56+
* @param s3Client the AmazonS3 client
57+
* @param bucket name of the bucket
58+
* @param path path within the bucket
59+
* @param reducedRedundancy whether to set storage class for uploads to reduced redundancy
60+
*/
61+
public S3OutputStream(AmazonS3 s3Client, String bucket, String path, boolean reducedRedundancy) {
62+
this.s3Client = s3Client;
63+
this.bucket = bucket;
64+
this.path = path;
65+
this.buf = new byte[BUFFER_SIZE];
66+
this.position = 0;
67+
this.etags = new ArrayList<>();
68+
this.open = true;
69+
this.reducedRedundancy = reducedRedundancy;
70+
}
71+
72+
/**
73+
* Write an array to the S3 output stream.
74+
*
75+
* @param b the byte-array to append
76+
*/
77+
@Override
78+
public void write(byte[] b) {
79+
write(b,0,b.length);
80+
}
81+
82+
/**
83+
* Writes an array to the S3 Output Stream
84+
*
85+
* @param byteArray the array to write
86+
* @param o the offset into the array
87+
* @param l the number of bytes to write
88+
*/
89+
@Override
90+
public void write(final byte[] byteArray, final int o, final int l) {
91+
this.assertOpen();
92+
int ofs = o, len = l;
93+
int size;
94+
while (len > (size = this.buf.length - position)) {
95+
System.arraycopy(byteArray, ofs, this.buf, this.position, size);
96+
this.position += size;
97+
flushBufferAndRewind();
98+
ofs += size;
99+
len -= size;
100+
}
101+
System.arraycopy(byteArray, ofs, this.buf, this.position, len);
102+
this.position += len;
103+
}
104+
105+
/**
106+
* Flushes the buffer by uploading a part to S3.
107+
*/
108+
@Override
109+
public synchronized void flush() {
110+
this.assertOpen();
111+
}
112+
113+
protected void flushBufferAndRewind() {
114+
if (uploadId == null) {
115+
final InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(this.bucket, this.path)
116+
.withStorageClass(reducedRedundancy ? StorageClass.ReducedRedundancy : StorageClass.Standard)
117+
.withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
118+
InitiateMultipartUploadResult initResponse = s3Client.initiateMultipartUpload(request);
119+
this.uploadId = initResponse.getUploadId();
120+
}
121+
uploadPart();
122+
this.position = 0;
123+
}
124+
125+
protected void uploadPart() {
126+
UploadPartResult uploadResult = this.s3Client.uploadPart(new UploadPartRequest()
127+
.withBucketName(this.bucket)
128+
.withKey(this.path)
129+
.withUploadId(this.uploadId)
130+
.withInputStream(new ByteArrayInputStream(buf,0,this.position))
131+
.withPartNumber(this.etags.size() + 1)
132+
.withPartSize(this.position));
133+
this.etags.add(uploadResult.getPartETag());
134+
//increase bytes written to match
135+
this.bytesWritten += this.position;
136+
}
137+
138+
@Override
139+
public void close() {
140+
if (this.open) {
141+
this.open = false;
142+
if (this.uploadId != null) {
143+
if (this.position > 0) {
144+
uploadPart();
145+
}
146+
this.s3Client.completeMultipartUpload(new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
147+
}
148+
else {
149+
final ObjectMetadata metadata = new ObjectMetadata();
150+
metadata.setContentLength(this.position);
151+
final PutObjectRequest request = new PutObjectRequest(this.bucket, this.path, new ByteArrayInputStream(this.buf, 0, this.position), metadata)
152+
.withStorageClass(reducedRedundancy ? StorageClass.ReducedRedundancy : StorageClass.Standard)
153+
.withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
154+
this.s3Client.putObject(request);
155+
}
156+
}
157+
}
158+
159+
public void cancel() {
160+
this.open = false;
161+
if (this.uploadId != null) {
162+
this.s3Client.abortMultipartUpload(new AbortMultipartUploadRequest(this.bucket, this.path, this.uploadId));
163+
}
164+
}
165+
166+
@Override
167+
public void write(int b) {
168+
this.assertOpen();
169+
if (position >= this.buf.length) {
170+
flushBufferAndRewind();
171+
}
172+
this.buf[position++] = (byte)b;
173+
}
174+
175+
public long getBytesWritten() {
176+
return bytesWritten;
177+
}
178+
179+
private void assertOpen() {
180+
if (!this.open) {
181+
throw new IllegalStateException("Closed");
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)