Skip to content

Commit 1cc5c42

Browse files
committed
Add more tests
1 parent 95279c9 commit 1cc5c42

File tree

10 files changed

+337
-126
lines changed

10 files changed

+337
-126
lines changed

services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadPauseResumeIntegrationTest.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Duration;
2727
import org.junit.jupiter.api.AfterAll;
2828
import org.junit.jupiter.api.BeforeAll;
29+
import org.junit.jupiter.api.Disabled;
2930
import org.junit.jupiter.api.Test;
3031
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
3132
import software.amazon.awssdk.core.waiters.Waiter;
@@ -40,6 +41,8 @@
4041
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
4142
import software.amazon.awssdk.utils.Logger;
4243

44+
// TODO: re-enable tests
45+
@Disabled("Disable tests because they are flaky right now due to crt bug")
4346
public class S3TransferManagerUploadPauseResumeIntegrationTest extends S3IntegrationTestBase {
4447
private static final Logger log = Logger.loggerFor(S3TransferManagerUploadPauseResumeIntegrationTest.class);
4548
private static final String BUCKET = temporaryBucketName(S3TransferManagerUploadPauseResumeIntegrationTest.class);
@@ -85,19 +88,14 @@ void pause_singlePart_shouldResume() {
8588
FileUpload fileUpload = tm.uploadFile(request);
8689
waitUntilFirstByteBufferDelivered(fileUpload);
8790
ResumableFileUpload resumableFileUpload = fileUpload.pause();
88-
log.info(() -> "Paused: " + resumableFileUpload);
91+
log.debug(() -> "Paused: " + resumableFileUpload);
8992

9093
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
91-
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();
92-
assertThat(resumableFileUpload.totalNumOfParts()).isNotEmpty();
93-
94-
verifyMultipartUploadIdExists(resumableFileUpload);
94+
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
95+
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
9596

9697
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
9798
resumedUpload.completionFuture().join();
98-
System.out.println(resumedUpload.progress().snapshot().transferredBytes());
99-
100-
//assertThat(resumedUpload.progress().snapshot().transferSizeInBytes()).hasValue((long) bytes.length);
10199
}
102100

103101
@Test
@@ -119,9 +117,6 @@ void pause_fileNotChanged_shouldResume() {
119117

120118
FileUpload resumedUpload = tm.resumeUploadFile(resumableFileUpload);
121119
resumedUpload.completionFuture().join();
122-
System.out.println(resumedUpload.progress().snapshot().transferredBytes());
123-
124-
//assertThat(resumedUpload.progress().snapshot().transferSizeInBytes()).hasValue((long) bytes.length);
125120
}
126121

127122
@Test
@@ -132,7 +127,7 @@ void pauseImmediately_resume_shouldStartFromBeginning() {
132127
.build();
133128
FileUpload fileUpload = tm.uploadFile(request);
134129
ResumableFileUpload resumableFileUpload = fileUpload.pause();
135-
log.info(() -> "Paused: " + resumableFileUpload);
130+
log.debug(() -> "Paused: " + resumableFileUpload);
136131

137132
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
138133
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
@@ -151,7 +146,7 @@ void pause_fileChanged_resumeShouldStartFromBeginning() throws Exception {
151146
FileUpload fileUpload = tm.uploadFile(request);
152147
waitUntilFirstByteBufferDelivered(fileUpload);
153148
ResumableFileUpload resumableFileUpload = fileUpload.pause();
154-
log.info(() -> "Paused: " + resumableFileUpload);
149+
log.debug(() -> "Paused: " + resumableFileUpload);
155150

156151
assertThat(resumableFileUpload.multipartUploadId()).isNotEmpty();
157152
assertThat(resumableFileUpload.partSizeInBytes()).isNotEmpty();

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import software.amazon.awssdk.services.s3.internal.resource.S3AccessPointResource;
4242
import software.amazon.awssdk.services.s3.internal.resource.S3ArnConverter;
4343
import software.amazon.awssdk.services.s3.internal.resource.S3Resource;
44+
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
4445
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
4546
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
4647
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
@@ -260,9 +261,12 @@ private FileUpload uploadFromBeginning(ResumableFileUpload resumableFileUpload,
260261
resumableFileUpload.multipartUploadId()
261262
.ifPresent(id -> {
262263
log.debug(() -> "Aborting previous upload with multipartUploadId: " + id);
263-
s3AsyncClient.abortMultipartUpload(request -> request.bucket(putObjectRequest.bucket())
264-
.key(putObjectRequest.key())
265-
.uploadId(id))
264+
s3AsyncClient.abortMultipartUpload(
265+
AbortMultipartUploadRequest.builder()
266+
.bucket(putObjectRequest.bucket())
267+
.key(putObjectRequest.key())
268+
.uploadId(id)
269+
.build())
266270
.whenComplete((r, t) -> {
267271
if (t != null) {
268272
log.warn(() -> "Failed to abort multipart upload: " + id,

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileUpload.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,13 @@ private ResumableFileUpload doPause() {
7373
try {
7474
token = observable.pause();
7575
} catch (CrtRuntimeException exception) {
76+
// CRT throws exception if it is a single part
7677
if (!exception.errorName.equals("AWS_ERROR_UNSUPPORTED_OPERATION")) {
7778
throw exception;
7879
}
7980
}
8081

81-
// upload hasn't started yet, or it's a single object upload
82+
// Upload hasn't started yet, or it's a single object upload
8283
if (token == null) {
8384
return ResumableFileUpload.builder()
8485
.fileLastModified(fileLastModified)

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/serialization/CrtUploadResumeToken.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717

1818
import java.nio.charset.StandardCharsets;
1919
import java.util.Map;
20+
import java.util.Objects;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
2122
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
2223
import software.amazon.awssdk.protocols.jsoncore.JsonNodeParser;
2324
import software.amazon.awssdk.protocols.jsoncore.JsonWriter;
2425

2526
@SdkInternalApi
26-
public class CrtUploadResumeToken {
27+
public final class CrtUploadResumeToken {
2728

2829
private final Long totalNumOfParts;
2930
private final Long partSizeInBytes;
@@ -47,6 +48,34 @@ public String multipartUploadId() {
4748
return multipartUploadId;
4849
}
4950

51+
@Override
52+
public boolean equals(Object o) {
53+
if (this == o) {
54+
return true;
55+
}
56+
if (o == null || getClass() != o.getClass()) {
57+
return false;
58+
}
59+
60+
CrtUploadResumeToken token = (CrtUploadResumeToken) o;
61+
62+
if (!Objects.equals(totalNumOfParts, token.totalNumOfParts)) {
63+
return false;
64+
}
65+
if (!Objects.equals(partSizeInBytes, token.partSizeInBytes)) {
66+
return false;
67+
}
68+
return Objects.equals(multipartUploadId, token.multipartUploadId);
69+
}
70+
71+
@Override
72+
public int hashCode() {
73+
int result = totalNumOfParts != null ? totalNumOfParts.hashCode() : 0;
74+
result = 31 * result + (partSizeInBytes != null ? partSizeInBytes.hashCode() : 0);
75+
result = 31 * result + (multipartUploadId != null ? multipartUploadId.hashCode() : 0);
76+
return result;
77+
}
78+
5079
public static String marshallResumeToken(CrtUploadResumeToken resumeToken) {
5180
JsonWriter jsonGenerator = JsonWriter.create();
5281
jsonGenerator.writeStartObject();

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/DefaultFileUploadTest.java

Lines changed: 193 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,208 @@
1515

1616
package software.amazon.awssdk.transfer.s3.internal;
1717

18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.Mockito.when;
21+
import static software.amazon.awssdk.transfer.s3.SizeConstant.MB;
22+
23+
import com.google.common.jimfs.Jimfs;
24+
import java.io.File;
25+
import java.io.IOException;
26+
import java.nio.charset.StandardCharsets;
27+
import java.nio.file.FileSystem;
28+
import java.nio.file.Files;
29+
import java.time.Instant;
30+
import java.util.UUID;
31+
import java.util.concurrent.CompletableFuture;
1832
import nl.jqno.equalsverifier.EqualsVerifier;
33+
import org.apache.commons.lang3.RandomStringUtils;
34+
import org.junit.jupiter.api.AfterAll;
35+
import org.junit.jupiter.api.BeforeAll;
36+
import org.junit.jupiter.api.BeforeEach;
1937
import org.junit.jupiter.api.Test;
38+
import org.mockito.Mockito;
39+
import software.amazon.awssdk.crt.CrtRuntimeException;
40+
import software.amazon.awssdk.crt.s3.S3MetaRequest;
2041
import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable;
42+
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
2143
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileUpload;
44+
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
45+
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
46+
import software.amazon.awssdk.transfer.s3.model.ResumableFileUpload;
47+
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;
48+
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
49+
50+
class DefaultFileUploadTest {
51+
private S3MetaRequest metaRequest;
52+
private static FileSystem fileSystem;
53+
private static File file;
54+
private static final String TOKEN = "{\"total_num_parts\":10,\"partition_size\":8388608,"
55+
+ "\"type\":\"AWS_S3_META_REQUEST_TYPE_PUT_OBJECT\",\"multipart_upload_id\":\"someId\"}";
56+
57+
@BeforeAll
58+
public static void setUp() throws IOException {
59+
fileSystem = Jimfs.newFileSystem();
60+
file = File.createTempFile("test", UUID.randomUUID().toString());
61+
Files.write(file.toPath(), RandomStringUtils.random(2000).getBytes(StandardCharsets.UTF_8));
62+
}
2263

23-
public class DefaultFileUploadTest {
64+
@AfterAll
65+
public static void tearDown() throws IOException {
66+
file.delete();
67+
}
68+
69+
@BeforeEach
70+
void setUpBeforeEachTest() {
71+
metaRequest = Mockito.mock(S3MetaRequest.class);
72+
}
2473

2574
@Test
26-
public void equals_hashcode() {
75+
void equals_hashcode() {
2776
EqualsVerifier.forClass(DefaultFileUpload.class)
2877
.withNonnullFields("completionFuture", "progress", "request", "observable", "resumableFileUpload")
29-
.withPrefabValues(S3MetaRequestPauseObservable.class, new S3MetaRequestPauseObservable(), new S3MetaRequestPauseObservable())
78+
.withPrefabValues(S3MetaRequestPauseObservable.class, new S3MetaRequestPauseObservable(),
79+
new S3MetaRequestPauseObservable())
3080
.verify();
3181
}
82+
83+
@Test
84+
void pause_futureCompleted_shouldReturnNormally() {
85+
PutObjectResponse putObjectResponse = PutObjectResponse.builder()
86+
.build();
87+
CompletableFuture<CompletedFileUpload> future =
88+
CompletableFuture.completedFuture(CompletedFileUpload.builder()
89+
.response(putObjectResponse)
90+
.build());
91+
TransferProgress transferProgress = Mockito.mock(TransferProgress.class);
92+
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
93+
.sdkResponse(putObjectResponse)
94+
.transferredBytes(0L)
95+
.build());
96+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
97+
98+
UploadFileRequest request = uploadFileRequest();
99+
100+
DefaultFileUpload fileUpload =
101+
new DefaultFileUpload(future, transferProgress, observable, request);
102+
103+
observable.subscribe(metaRequest);
104+
105+
ResumableFileUpload resumableFileUpload = fileUpload.pause();
106+
Mockito.verify(metaRequest, Mockito.never()).pause();
107+
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
108+
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
109+
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
110+
assertThat(resumableFileUpload.fileLength()).isEqualTo(file.length());
111+
assertThat(resumableFileUpload.uploadFileRequest()).isEqualTo(request);
112+
assertThat(resumableFileUpload.fileLastModified()).isEqualTo(Instant.ofEpochMilli(file.lastModified()));
113+
}
114+
115+
@Test
116+
void pauseTwice_shouldReturnTheSame() {
117+
CompletableFuture<CompletedFileUpload> future =
118+
new CompletableFuture<>();
119+
TransferProgress transferProgress = Mockito.mock(TransferProgress.class);
120+
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
121+
.transferredBytes(1000L)
122+
.build());
123+
UploadFileRequest request = uploadFileRequest();
124+
125+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
126+
when(metaRequest.pause()).thenReturn(TOKEN);
127+
observable.subscribe(metaRequest);
128+
129+
DefaultFileUpload fileUpload =
130+
new DefaultFileUpload(future, transferProgress, observable, request);
131+
132+
ResumableFileUpload resumableFileUpload = fileUpload.pause();
133+
ResumableFileUpload resumableFileUpload2 = fileUpload.pause();
134+
135+
assertThat(resumableFileUpload).isEqualTo(resumableFileUpload2);
136+
}
137+
138+
@Test
139+
void pause_crtThrowException_shouldPropogate() {
140+
CompletableFuture<CompletedFileUpload> future =
141+
new CompletableFuture<>();
142+
TransferProgress transferProgress = Mockito.mock(TransferProgress.class);
143+
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
144+
.transferredBytes(1000L)
145+
.build());
146+
UploadFileRequest request = uploadFileRequest();
147+
148+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
149+
CrtRuntimeException exception = new CrtRuntimeException("exception");
150+
when(metaRequest.pause()).thenThrow(exception);
151+
observable.subscribe(metaRequest);
152+
153+
DefaultFileUpload fileUpload =
154+
new DefaultFileUpload(future, transferProgress, observable, request);
155+
156+
assertThatThrownBy(() -> fileUpload.pause()).isSameAs(exception);
157+
}
158+
159+
@Test
160+
void pause_futureNotComplete_shouldPause() {
161+
CompletableFuture<CompletedFileUpload> future =
162+
new CompletableFuture<>();
163+
TransferProgress transferProgress = Mockito.mock(TransferProgress.class);
164+
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
165+
.transferredBytes(0L)
166+
.build());
167+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
168+
when(metaRequest.pause()).thenReturn(TOKEN);
169+
UploadFileRequest request = uploadFileRequest();
170+
171+
DefaultFileUpload fileUpload =
172+
new DefaultFileUpload(future, transferProgress, observable, request);
173+
174+
observable.subscribe(metaRequest);
175+
176+
ResumableFileUpload resumableFileUpload = fileUpload.pause();
177+
Mockito.verify(metaRequest).pause();
178+
assertThat(resumableFileUpload.totalNumOfParts()).hasValue(10);
179+
assertThat(resumableFileUpload.partSizeInBytes()).hasValue(8 * MB);
180+
assertThat(resumableFileUpload.multipartUploadId()).hasValue("someId");
181+
assertThat(resumableFileUpload.fileLength()).isEqualTo(file.length());
182+
assertThat(resumableFileUpload.uploadFileRequest()).isEqualTo(request);
183+
assertThat(resumableFileUpload.fileLastModified()).isEqualTo(Instant.ofEpochMilli(file.lastModified()));
184+
}
185+
186+
@Test
187+
void pause_singlePart_shouldPause() {
188+
PutObjectResponse putObjectResponse = PutObjectResponse.builder()
189+
.build();
190+
CompletableFuture<CompletedFileUpload> future =
191+
new CompletableFuture<>();
192+
TransferProgress transferProgress = Mockito.mock(TransferProgress.class);
193+
when(transferProgress.snapshot()).thenReturn(DefaultTransferProgressSnapshot.builder()
194+
.sdkResponse(putObjectResponse)
195+
.transferredBytes(0L)
196+
.build());
197+
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();
198+
when(metaRequest.pause()).thenThrow(new CrtRuntimeException(6));
199+
UploadFileRequest request = uploadFileRequest();
200+
201+
DefaultFileUpload fileUpload =
202+
new DefaultFileUpload(future, transferProgress, observable, request);
203+
204+
observable.subscribe(metaRequest);
205+
206+
ResumableFileUpload resumableFileUpload = fileUpload.pause();
207+
Mockito.verify(metaRequest).pause();
208+
assertThat(resumableFileUpload.totalNumOfParts()).isEmpty();
209+
assertThat(resumableFileUpload.partSizeInBytes()).isEmpty();
210+
assertThat(resumableFileUpload.multipartUploadId()).isEmpty();
211+
assertThat(resumableFileUpload.fileLength()).isEqualTo(file.length());
212+
assertThat(resumableFileUpload.uploadFileRequest()).isEqualTo(request);
213+
assertThat(resumableFileUpload.fileLastModified()).isEqualTo(Instant.ofEpochMilli(file.lastModified()));
214+
}
215+
216+
private UploadFileRequest uploadFileRequest() {
217+
return UploadFileRequest.builder()
218+
.source(file)
219+
.putObjectRequest(p -> p.key("test").bucket("bucket"))
220+
.build();
221+
}
32222
}

0 commit comments

Comments
 (0)