Skip to content

Commit 99a25fc

Browse files
committed
Integrate with latest CRT pause/resume fix
1 parent b46b71d commit 99a25fc

File tree

15 files changed

+160
-240
lines changed

15 files changed

+160
-240
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
<rxjava.version>2.2.21</rxjava.version>
115115
<commons-codec.verion>1.10</commons-codec.verion>
116116
<jmh.version>1.29</jmh.version>
117-
<awscrt.version>0.19.10</awscrt.version>
117+
<awscrt.version>0.20.1</awscrt.version>
118118

119119
<!--Test dependencies -->
120120
<junit5.version>5.8.1</junit5.version>

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,20 @@
2626
import java.time.Duration;
2727
import org.junit.jupiter.api.AfterAll;
2828
import org.junit.jupiter.api.BeforeAll;
29-
import org.junit.jupiter.api.Disabled;
3029
import org.junit.jupiter.api.Test;
3130
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
3231
import software.amazon.awssdk.core.waiters.Waiter;
3332
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
34-
import software.amazon.awssdk.services.s3.S3AsyncClient;
33+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
3534
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
3635
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
3736
import software.amazon.awssdk.testutils.RandomTempFile;
3837
import software.amazon.awssdk.transfer.s3.model.FileUpload;
3938
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
4039
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
41-
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
40+
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
4241
import software.amazon.awssdk.utils.Logger;
4342

44-
// TODO: re-enable tests
45-
@Disabled("Disable tests because they are flaky right now due to crt bug")
4643
public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3IntegrationTestBase {
4744
private static final Logger log = Logger.loggerFor(S3TransferManagerUploadPauseResumeIntegrationTest.class);
4845
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
@@ -73,13 +70,11 @@ void pause_singlePart_shouldResume() {
7370
.source(smallFile)
7471
.build();
7572
FileUpload fileUpload = tm.uploadFile(request);
76-
waitUntilFirstByteBufferDelivered(fileUpload);
73+
waitUntilMultipartUploadExists();
7774
ResumableFileUpload resumableFileUpload = fileUpload.pause();
78-
log.debug(() -> "Paused: " + resumableFileUpload);
75+
log.info(() -> "Paused: " + resumableFileUpload);
7976

80-
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
81-
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
82-
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
77+
validateEmptyResumeToken(resumableFileUpload);
8378

8479
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
8580
resumedUpload.completionFuture().join();
@@ -89,16 +84,17 @@ void pause_singlePart_shouldResume() {
8984
void pause_fileNotChanged_shouldResume() {
9085
UploadFileRequest request = UploadFileRequest.builder()
9186
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
87+
.addTransferListener(LoggingTransferListener.create())
9288
.source(largeFile)
9389
.build();
9490
FileUpload fileUpload = tm.uploadFile(request);
95-
waitUntilFirstByteBufferDelivered(fileUpload);
91+
waitUntilMultipartUploadExists();
9692
ResumableFileUpload resumableFileUpload = fileUpload.pause();
9793
log.info(() -> "Paused: " + resumableFileUpload);
9894

9995
assertThat(resumableFileUpload.multipartUploadId()).isNotEmpty();
10096
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();
101-
assertThat(resumableFileUpload.totalNumOfParts()).isNotEmpty();
97+
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
10298

10399
verifyMultipartUploadIdExists(resumableFileUpload);
104100

@@ -114,30 +110,35 @@ void pauseImmediately_resume_shouldStartFromBeginning() {
114110
.build();
115111
FileUpload fileUpload = tm.uploadFile(request);
116112
ResumableFileUpload resumableFileUpload = fileUpload.pause();
117-
log.debug(() -> "Paused: " + resumableFileUpload);
113+
log.info(() -> "Paused: " + resumableFileUpload);
118114

119-
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
120-
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
121-
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
115+
validateEmptyResumeToken(resumableFileUpload);
122116

123117
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
124118
resumedUpload.completionFuture().join();
125119
}
126120

121+
private static void validateEmptyResumeToken(ResumableFileUpload resumableFileUpload) {
122+
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
123+
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
124+
assertThat(resumableFileUpload.totalParts()).isEmpty();
125+
assertThat(resumableFileUpload.transferredParts()).isEmpty();
126+
}
127+
127128
@Test
128129
void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
129130
UploadFileRequest request = UploadFileRequest.builder()
130131
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
131132
.source(largeFile)
132133
.build();
133134
FileUpload fileUpload = tm.uploadFile(request);
134-
waitUntilFirstByteBufferDelivered(fileUpload);
135+
waitUntilMultipartUploadExists();
135136
ResumableFileUpload resumableFileUpload = fileUpload.pause();
136-
log.debug(() -> "Paused: " + resumableFileUpload);
137+
log.info(() -> "Paused: " + resumableFileUpload);
137138

138139
assertThat(resumableFileUpload.multipartUploadId()).isNotEmpty();
139140
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();
140-
assertThat(resumableFileUpload.totalNumOfParts()).isNotEmpty();
141+
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
141142
verifyMultipartUploadIdExists(resumableFileUpload);
142143

143144
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
@@ -162,14 +163,14 @@ private void verifyMultipartUploadIdNotExist(ResumableFileUpload resumableFileUp
162163
.hasCauseInstanceOf(NoSuchUploadException.class);
163164
}
164165

165-
private static void waitUntilFirstByteBufferDelivered(FileUpload upload) {
166-
Waiter<TransferProgressSnapshot> waiter = Waiter.builder(TransferProgressSnapshot.class)
167-
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(r -> r.transferredBytes() > 0))
166+
private static void waitUntilMultipartUploadExists() {
167+
Waiter<ListMultipartUploadsResponse> waiter = Waiter.builder(ListMultipartUploadsResponse.class)
168+
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(ListMultipartUploadsResponse::hasUploads))
168169
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
169170
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1))
170-
.maxAttempts(Integer.MAX_VALUE)
171+
.maxAttempts(10)
171172
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(100))))
172173
.build();
173-
waiter.run(() -> upload.progress().snapshot());
174+
waiter.run(() -> s3.listMultipartUploads(l -> l.bucket(BUCKET)));
174175
}
175176
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.awssdk.core.exception.SdkClientException;
3737
import software.amazon.awssdk.core.exception.SdkException;
3838
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
39+
import software.amazon.awssdk.crt.s3.ResumeToken;
3940
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
4041
import software.amazon.awssdk.services.s3.S3AsyncClient;
4142
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
@@ -60,7 +61,6 @@
6061
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
6162
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
6263
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
63-
import software.amazon.awssdk.transfer.s3.internal.serialization.CrtUploadResumeToken;
6464
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
6565
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
6666
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
@@ -235,14 +235,10 @@ public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
235235
private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
236236
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
237237
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
238-
239-
CrtUploadResumeToken token = new CrtUploadResumeToken(resumableFileUpload.totalNumOfParts().getAsLong(),
240-
resumableFileUpload.partSizeInBytes().getAsLong(),
241-
resumableFileUpload.multipartUploadId().orElse(null));
242-
String marshalledToken = token.marshallResumeToken();
238+
ResumeToken resumeToken = crtResumeToken(resumableFileUpload);
243239

244240
Consumer<SdkHttpExecutionAttributes.Builder> attachResumeToken =
245-
b -> b.put(CRT_PAUSE_RESUME_TOKEN, marshalledToken);
241+
b -> b.put(CRT_PAUSE_RESUME_TOKEN, resumeToken);
246242

247243
PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);
248244

@@ -251,6 +247,14 @@ private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
251247
.build());
252248
}
253249

250+
private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUpload) {
251+
return new ResumeToken(new ResumeToken.PutResumeTokenBuilder()
252+
.withNumPartsCompleted(resumableFileUpload.transferredParts().orElse(0L))
253+
.withTotalNumParts(resumableFileUpload.totalParts().orElse(0L))
254+
.withPartSize(resumableFileUpload.partSizeInBytes().getAsLong())
255+
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
256+
}
257+
254258
private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
255259
boolean noResumeToken) {
256260
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
@@ -301,7 +305,7 @@ private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload,
301305
}
302306

303307
private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
304-
return resumableFileUpload.totalNumOfParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
308+
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
305309
}
306310

307311
private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileUpload.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import software.amazon.awssdk.annotations.SdkInternalApi;
2222
import software.amazon.awssdk.crt.CrtRuntimeException;
23+
import software.amazon.awssdk.crt.s3.ResumeToken;
2324
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
24-
import software.amazon.awssdk.transfer.s3.internal.serialization.CrtUploadResumeToken;
2525
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
2626
import software.amazon.awssdk.transfer.s3.model.FileUpload;
2727
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
@@ -66,10 +66,10 @@ private ResumableFileUpload doPause() {
6666
.build();
6767
}
6868

69-
completionFuture.cancel(true);
69+
7070
Instant fileLastModified = Instant.ofEpochMilli(sourceFile
7171
.lastModified());
72-
String token = null;
72+
ResumeToken token = null;
7373
try {
7474
token = observable.pause();
7575
} catch (CrtRuntimeException exception) {
@@ -79,6 +79,7 @@ private ResumableFileUpload doPause() {
7979
}
8080
}
8181

82+
completionFuture.cancel(true);
8283
// Upload hasn't started yet, or it's a single object upload
8384
if (token == null) {
8485
return ResumableFileUpload.builder()
@@ -88,12 +89,11 @@ private ResumableFileUpload doPause() {
8889
.build();
8990
}
9091

91-
CrtUploadResumeToken pauseResumeToken = CrtUploadResumeToken.unmarshallResumeToken(token);
92-
9392
return ResumableFileUpload.builder()
94-
.multipartUploadId(pauseResumeToken.multipartUploadId())
95-
.totalNumOfParts(pauseResumeToken.totalNumOfParts())
96-
.partSizeInBytes(pauseResumeToken.partSizeInBytes())
93+
.multipartUploadId(token.getUploadId())
94+
.totalParts(token.getTotalNumParts())
95+
.transferredParts(token.getNumPartsCompleted())
96+
.partSizeInBytes(token.getPartSize())
9797
.fileLastModified(fileLastModified)
9898
.fileLength(sourceFile.length())
9999
.uploadFileRequest(request)

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/serialization/CrtUploadResumeToken.java

Lines changed: 0 additions & 108 deletions
This file was deleted.

0 commit comments

Comments
 (0)