Skip to content

Commit b257950

Browse files
authored
Integrate with latest CRT pause/resume fix (#3588)
* Integrate with latest CRT pause/resume fix * Bump CRT version
1 parent 22c60a9 commit b257950

File tree

15 files changed

+157
-238
lines changed

15 files changed

+157
-238
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
<rxjava.version>2.2.21</rxjava.version>
115115
<commons-codec.verion>1.10</commons-codec.verion>
116116
<jmh.version>1.29</jmh.version>
117-
<awscrt.version>0.19.10</awscrt.version>
117+
<awscrt.version>0.20.3</awscrt.version>
118118

119119
<!--Test dependencies -->
120120
<junit5.version>5.8.1</junit5.version>

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,20 @@
2626
import java.time.Duration;
2727
import org.junit.jupiter.api.AfterAll;
2828
import org.junit.jupiter.api.BeforeAll;
29-
import org.junit.jupiter.api.Disabled;
3029
import org.junit.jupiter.api.Test;
3130
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
3231
import software.amazon.awssdk.core.waiters.Waiter;
3332
import software.amazon.awssdk.core.waiters.WaiterAcceptor;
34-
import software.amazon.awssdk.services.s3.S3AsyncClient;
33+
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsResponse;
3534
import software.amazon.awssdk.services.s3.model.ListPartsResponse;
3635
import software.amazon.awssdk.services.s3.model.NoSuchUploadException;
3736
import software.amazon.awssdk.testutils.RandomTempFile;
3837
import software.amazon.awssdk.transfer.s3.model.FileUpload;
3938
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
4039
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
41-
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
40+
import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
4241
import software.amazon.awssdk.utils.Logger;
4342

44-
// TODO: re-enable tests
45-
@Disabled("Disable tests because they are flaky right now due to crt bug")
4643
public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3IntegrationTestBase {
4744
private static final Logger log = Logger.loggerFor(S3TransferManagerUploadPauseResumeIntegrationTest.class);
4845
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
@@ -73,13 +70,10 @@ void pause_singlePart_shouldResume() {
7370
.source(smallFile)
7471
.build();
7572
FileUpload fileUpload = tm.uploadFile(request);
76-
waitUntilFirstByteBufferDelivered(fileUpload);
7773
ResumableFileUpload resumableFileUpload = fileUpload.pause();
7874
log.debug(() -> "Paused: " + resumableFileUpload);
7975

80-
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
81-
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
82-
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
76+
validateEmptyResumeToken(resumableFileUpload);
8377

8478
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
8579
resumedUpload.completionFuture().join();
@@ -89,16 +83,17 @@ void pause_singlePart_shouldResume() {
8983
void pause_fileNotChanged_shouldResume() {
9084
UploadFileRequest request = UploadFileRequest.builder()
9185
.putObjectRequest(b -> b.bucket(BUCKET).key(KEY))
86+
.addTransferListener(LoggingTransferListener.create())
9287
.source(largeFile)
9388
.build();
9489
FileUpload fileUpload = tm.uploadFile(request);
95-
waitUntilFirstByteBufferDelivered(fileUpload);
90+
waitUntilMultipartUploadExists();
9691
ResumableFileUpload resumableFileUpload = fileUpload.pause();
97-
log.info(() -> "Paused: " + resumableFileUpload);
92+
log.debug(() -> "Paused: " + resumableFileUpload);
9893

9994
assertThat(resumableFileUpload.multipartUploadId()).isNotEmpty();
10095
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();
101-
assertThat(resumableFileUpload.totalNumOfParts()).isNotEmpty();
96+
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
10297

10398
verifyMultipartUploadIdExists(resumableFileUpload);
10499

@@ -116,9 +111,7 @@ void pauseImmediately_resume_shouldStartFromBeginning() {
116111
ResumableFileUpload resumableFileUpload = fileUpload.pause();
117112
log.debug(() -> "Paused: " + resumableFileUpload);
118113

119-
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
120-
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
121-
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
114+
validateEmptyResumeToken(resumableFileUpload);
122115

123116
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
124117
resumedUpload.completionFuture().join();
@@ -131,13 +124,13 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
131124
.source(largeFile)
132125
.build();
133126
FileUpload fileUpload = tm.uploadFile(request);
134-
waitUntilFirstByteBufferDelivered(fileUpload);
127+
waitUntilMultipartUploadExists();
135128
ResumableFileUpload resumableFileUpload = fileUpload.pause();
136129
log.debug(() -> "Paused: " + resumableFileUpload);
137130

138131
assertThat(resumableFileUpload.multipartUploadId()).isNotEmpty();
139132
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();
140-
assertThat(resumableFileUpload.totalNumOfParts()).isNotEmpty();
133+
assertThat(resumableFileUpload.totalParts()).isNotEmpty();
141134
verifyMultipartUploadIdExists(resumableFileUpload);
142135

143136
byte[] bytes = "helloworld".getBytes(StandardCharsets.UTF_8);
@@ -162,14 +155,21 @@ private void verifyMultipartUploadIdNotExist(ResumableFileUpload resumableFileUp
162155
.hasCauseInstanceOf(NoSuchUploadException.class);
163156
}
164157

165-
private static void waitUntilFirstByteBufferDelivered(FileUpload upload) {
166-
Waiter<TransferProgressSnapshot> waiter = Waiter.builder(TransferProgressSnapshot.class)
167-
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(r -> r.transferredBytes() > 0))
158+
private static void waitUntilMultipartUploadExists() {
159+
Waiter<ListMultipartUploadsResponse> waiter = Waiter.builder(ListMultipartUploadsResponse.class)
160+
.addAcceptor(WaiterAcceptor.successOnResponseAcceptor(ListMultipartUploadsResponse::hasUploads))
168161
.addAcceptor(WaiterAcceptor.retryOnResponseAcceptor(r -> true))
169162
.overrideConfiguration(o -> o.waitTimeout(Duration.ofMinutes(1))
170-
.maxAttempts(Integer.MAX_VALUE)
163+
.maxAttempts(10)
171164
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofMillis(100))))
172165
.build();
173-
waiter.run(() -> upload.progress().snapshot());
166+
waiter.run(() -> s3.listMultipartUploads(l -> l.bucket(BUCKET)));
167+
}
168+
169+
private static void validateEmptyResumeToken(ResumableFileUpload resumableFileUpload) {
170+
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
171+
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
172+
assertThat(resumableFileUpload.totalParts()).isEmpty();
173+
assertThat(resumableFileUpload.transferredParts()).isEmpty();
174174
}
175175
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.awssdk.core.exception.SdkClientException;
3737
import software.amazon.awssdk.core.exception.SdkException;
3838
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
39+
import software.amazon.awssdk.crt.s3.ResumeToken;
3940
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
4041
import software.amazon.awssdk.services.s3.S3AsyncClient;
4142
import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
@@ -60,7 +61,6 @@
6061
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
6162
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
6263
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
63-
import software.amazon.awssdk.transfer.s3.internal.serialization.CrtUploadResumeToken;
6464
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
6565
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
6666
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
@@ -239,14 +239,10 @@ public FileUpload resumeUploadFile(ResumableFileUpload resumableFileUpload) {
239239
private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
240240
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
241241
PutObjectRequest putObjectRequest = uploadFileRequest.putObjectRequest();
242-
243-
CrtUploadResumeToken token = new CrtUploadResumeToken(resumableFileUpload.totalNumOfParts().getAsLong(),
244-
resumableFileUpload.partSizeInBytes().getAsLong(),
245-
resumableFileUpload.multipartUploadId().orElse(null));
246-
String marshalledToken = token.marshallResumeToken();
242+
ResumeToken resumeToken = crtResumeToken(resumableFileUpload);
247243

248244
Consumer<SdkHttpExecutionAttributes.Builder> attachResumeToken =
249-
b -> b.put(CRT_PAUSE_RESUME_TOKEN, marshalledToken);
245+
b -> b.put(CRT_PAUSE_RESUME_TOKEN, resumeToken);
250246

251247
PutObjectRequest modifiedPutObjectRequest = attachSdkAttribute(putObjectRequest, attachResumeToken);
252248

@@ -255,6 +251,14 @@ private FileUpload doResumeUpload(ResumableFileUpload resumableFileUpload) {
255251
.build());
256252
}
257253

254+
private static ResumeToken crtResumeToken(ResumableFileUpload resumableFileUpload) {
255+
return new ResumeToken(new ResumeToken.PutResumeTokenBuilder()
256+
.withNumPartsCompleted(resumableFileUpload.transferredParts().orElse(0L))
257+
.withTotalNumParts(resumableFileUpload.totalParts().orElse(0L))
258+
.withPartSize(resumableFileUpload.partSizeInBytes().getAsLong())
259+
.withUploadId(resumableFileUpload.multipartUploadId().orElse(null)));
260+
}
261+
258262
private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload, boolean fileModified,
259263
boolean noResumeToken) {
260264
UploadFileRequest uploadFileRequest = resumableFileUpload.uploadFileRequest();
@@ -305,7 +309,7 @@ private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload,
305309
}
306310

307311
private boolean hasResumeToken(ResumableFileUpload resumableFileUpload) {
308-
return resumableFileUpload.totalNumOfParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
312+
return resumableFileUpload.totalParts().isPresent() && resumableFileUpload.partSizeInBytes().isPresent();
309313
}
310314

311315
private PutObjectRequest attachSdkAttribute(PutObjectRequest putObjectRequest,

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileUpload.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import java.util.concurrent.CompletableFuture;
2121
import software.amazon.awssdk.annotations.SdkInternalApi;
2222
import software.amazon.awssdk.crt.CrtRuntimeException;
23+
import software.amazon.awssdk.crt.s3.ResumeToken;
2324
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
2425
import software.amazon.awssdk.transfer.s3.internal.S3ClientType;
25-
import software.amazon.awssdk.transfer.s3.internal.serialization.CrtUploadResumeToken;
2626
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
2727
import software.amazon.awssdk.transfer.s3.model.FileUpload;
2828
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
@@ -76,10 +76,10 @@ private ResumableFileUpload doPause() {
7676
.build();
7777
}
7878

79-
completionFuture.cancel(true);
79+
8080
Instant fileLastModified = Instant.ofEpochMilli(sourceFile
8181
.lastModified());
82-
String token = null;
82+
ResumeToken token = null;
8383
try {
8484
token = observable.pause();
8585
} catch (CrtRuntimeException exception) {
@@ -89,6 +89,7 @@ private ResumableFileUpload doPause() {
8989
}
9090
}
9191

92+
completionFuture.cancel(true);
9293
// Upload hasn't started yet, or it's a single object upload
9394
if (token == null) {
9495
return ResumableFileUpload.builder()
@@ -98,12 +99,11 @@ private ResumableFileUpload doPause() {
9899
.build();
99100
}
100101

101-
CrtUploadResumeToken pauseResumeToken = CrtUploadResumeToken.unmarshallResumeToken(token);
102-
103102
return ResumableFileUpload.builder()
104-
.multipartUploadId(pauseResumeToken.multipartUploadId())
105-
.totalNumOfParts(pauseResumeToken.totalNumOfParts())
106-
.partSizeInBytes(pauseResumeToken.partSizeInBytes())
103+
.multipartUploadId(token.getUploadId())
104+
.totalParts(token.getTotalNumParts())
105+
.transferredParts(token.getNumPartsCompleted())
106+
.partSizeInBytes(token.getPartSize())
107107
.fileLastModified(fileLastModified)
108108
.fileLength(sourceFile.length())
109109
.uploadFileRequest(request)

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/serialization/CrtUploadResumeToken.java

Lines changed: 0 additions & 108 deletions
This file was deleted.

0 commit comments

Comments
 (0)