Skip to content

Commit 5167071

Browse files
committed
Allow pausing a resumed download even when the download hasn't already started.
1 parent 170afec commit 5167071

16 files changed

+255
-143
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Require setting the bytes transferred on transfer progress snapshots. This prevents programming bugs where the caller forgets to set the value and it gets defaulted to 0."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS S3 Transfer Manager",
4+
"contributor": "",
5+
"description": "Allow pausing a resumed download, even if the resumed download hasn't started."
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/DefaultS3TransferManager.java

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static software.amazon.awssdk.transfer.s3.internal.utils.ResumableRequestConverter.toDownloadFileRequestAndTransformer;
1919

2020
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionException;
2122
import software.amazon.awssdk.annotations.SdkInternalApi;
2223
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2324
import software.amazon.awssdk.arns.Arn;
@@ -33,6 +34,7 @@
3334
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
3435
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
3536
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
37+
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
3638
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
3739
import software.amazon.awssdk.transfer.s3.S3TransferManager;
3840
import software.amazon.awssdk.transfer.s3.config.S3TransferManagerOverrideConfiguration;
@@ -43,6 +45,7 @@
4345
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileDownload;
4446
import software.amazon.awssdk.transfer.s3.internal.model.DefaultFileUpload;
4547
import software.amazon.awssdk.transfer.s3.internal.model.DefaultUpload;
48+
import software.amazon.awssdk.transfer.s3.internal.progress.ResumeTransferProgress;
4649
import software.amazon.awssdk.transfer.s3.internal.progress.TransferProgressUpdater;
4750
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
4851
import software.amazon.awssdk.transfer.s3.model.CompletedDownload;
@@ -243,8 +246,7 @@ public FileDownload downloadFile(DownloadFileRequest downloadRequest) {
243246
CompletableFuture<CompletedFileDownload> returnFuture = new CompletableFuture<>();
244247
TransferProgressUpdater progressUpdater = doDownloadFile(downloadRequest, responseTransformer, returnFuture);
245248

246-
return new DefaultFileDownload(returnFuture, CompletableFuture.completedFuture(progressUpdater.progress()),
247-
CompletableFuture.completedFuture(downloadRequest));
249+
return new DefaultFileDownload(returnFuture, progressUpdater.progress(), () -> downloadRequest, null);
248250
}
249251

250252
private TransferProgressUpdater doDownloadFile(
@@ -285,26 +287,43 @@ public FileDownload resumeDownloadFile(ResumableFileDownload resumableFileDownlo
285287
CompletableFuture<TransferProgress> progressFuture = new CompletableFuture<>();
286288
CompletableFuture<DownloadFileRequest> newDownloadFileRequestFuture = new CompletableFuture<>();
287289

288-
s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key()))
289-
.thenAccept(headObjectResponse -> {
290-
Pair<DownloadFileRequest, AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>
291-
requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse,
292-
originalDownloadRequest);
293-
294-
DownloadFileRequest newDownloadFileRequest = requestPair.left();
295-
newDownloadFileRequestFuture.complete(newDownloadFileRequest);
296-
log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest);
297-
298-
TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest,
299-
requestPair.right(),
300-
returnFuture);
301-
progressFuture.complete(progressUpdater.progress());
302-
}).exceptionally(throwable -> {
303-
handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable);
304-
return null;
305-
});
306-
307-
return new DefaultFileDownload(returnFuture, progressFuture, newDownloadFileRequestFuture);
290+
CompletableFuture<HeadObjectResponse> headFuture =
291+
s3AsyncClient.headObject(b -> b.bucket(getObjectRequest.bucket()).key(getObjectRequest.key()));
292+
293+
// Ensure cancellations are forwarded to the head future
294+
CompletableFutureUtils.forwardExceptionTo(returnFuture, headFuture);
295+
296+
headFuture.thenAccept(headObjectResponse -> {
297+
Pair<DownloadFileRequest, AsyncResponseTransformer<GetObjectResponse, GetObjectResponse>>
298+
requestPair = toDownloadFileRequestAndTransformer(resumableFileDownload, headObjectResponse,
299+
originalDownloadRequest);
300+
301+
DownloadFileRequest newDownloadFileRequest = requestPair.left();
302+
newDownloadFileRequestFuture.complete(newDownloadFileRequest);
303+
log.debug(() -> "Sending downloadFileRequest " + newDownloadFileRequest);
304+
305+
TransferProgressUpdater progressUpdater = doDownloadFile(newDownloadFileRequest,
306+
requestPair.right(),
307+
returnFuture);
308+
progressFuture.complete(progressUpdater.progress());
309+
}).exceptionally(throwable -> {
310+
handleException(returnFuture, progressFuture, newDownloadFileRequestFuture, throwable);
311+
return null;
312+
});
313+
314+
return new DefaultFileDownload(returnFuture,
315+
new ResumeTransferProgress(progressFuture),
316+
() -> newOrOriginalRequestForPause(newDownloadFileRequestFuture, originalDownloadRequest),
317+
resumableFileDownload);
318+
}
319+
320+
private DownloadFileRequest newOrOriginalRequestForPause(CompletableFuture<DownloadFileRequest> newDownloadFuture,
321+
DownloadFileRequest originalDownloadRequest) {
322+
try {
323+
return newDownloadFuture.getNow(originalDownloadRequest);
324+
} catch (CompletionException e) {
325+
return originalDownloadRequest;
326+
}
308327
}
309328

310329
private static void handleException(CompletableFuture<CompletedFileDownload> returnFuture,

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/model/DefaultFileDownload.java

Lines changed: 42 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,128 +17,89 @@
1717

1818
import java.io.File;
1919
import java.time.Instant;
20-
import java.util.Objects;
2120
import java.util.concurrent.CompletableFuture;
21+
import java.util.function.Supplier;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
23-
import software.amazon.awssdk.core.exception.SdkClientException;
2423
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
25-
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgress;
26-
import software.amazon.awssdk.transfer.s3.internal.progress.DefaultTransferProgressSnapshot;
2724
import software.amazon.awssdk.transfer.s3.model.CompletedFileDownload;
2825
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
2926
import software.amazon.awssdk.transfer.s3.model.FileDownload;
3027
import software.amazon.awssdk.transfer.s3.model.ResumableFileDownload;
3128
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
3229
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
33-
import software.amazon.awssdk.utils.Logger;
30+
import software.amazon.awssdk.utils.Lazy;
3431
import software.amazon.awssdk.utils.ToString;
3532
import software.amazon.awssdk.utils.Validate;
3633

3734
@SdkInternalApi
3835
public final class DefaultFileDownload implements FileDownload {
39-
private static final Logger log = Logger.loggerFor(FileDownload.class);
4036
private final CompletableFuture<CompletedFileDownload> completionFuture;
41-
private final CompletableFuture<TransferProgress> progressFuture;
42-
private final CompletableFuture<DownloadFileRequest> requestFuture;
43-
private volatile ResumableFileDownload resumableFileDownload;
44-
private final Object lock = new Object();
37+
private final Lazy<ResumableFileDownload> resumableFileDownload;
38+
private final TransferProgress progress;
39+
private final Supplier<DownloadFileRequest> requestSupplier;
40+
private final ResumableFileDownload resumedDownload;
4541

4642
public DefaultFileDownload(CompletableFuture<CompletedFileDownload> completedFileDownloadFuture,
47-
CompletableFuture<TransferProgress> progressFuture,
48-
CompletableFuture<DownloadFileRequest> requestFuture) {
43+
TransferProgress progress,
44+
Supplier<DownloadFileRequest> requestSupplier,
45+
ResumableFileDownload resumedDownload) {
4946
this.completionFuture = Validate.paramNotNull(completedFileDownloadFuture, "completedFileDownloadFuture");
50-
this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture");
51-
this.requestFuture = Validate.paramNotNull(requestFuture, "requestFuture");
47+
this.progress = Validate.paramNotNull(progress, "progress");
48+
this.requestSupplier = Validate.paramNotNull(requestSupplier, "requestSupplier");
49+
this.resumableFileDownload = new Lazy<>(this::doPause);
50+
this.resumedDownload = resumedDownload;
5251
}
5352

5453
@Override
5554
public TransferProgress progress() {
56-
return progressFuture.isDone() ? progressFuture.join() :
57-
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder().build());
55+
return progress;
5856
}
5957

6058
@Override
6159
public ResumableFileDownload pause() {
62-
log.debug(() -> "Start to pause ");
63-
if (resumableFileDownload == null) {
64-
synchronized (lock) {
65-
if (resumableFileDownload == null) {
66-
completionFuture.cancel(true);
67-
68-
if (!requestFuture.isDone() || !progressFuture.isDone()) {
69-
throw SdkClientException.create("DownloadFileRequest is unknown, not able to pause. This is likely "
70-
+ "because you are trying to pause a resumed download request that "
71-
+ "hasn't started yet. Please try later");
72-
}
73-
DownloadFileRequest request = requestFuture.join();
74-
TransferProgress progress = progressFuture.join();
75-
76-
Instant s3objectLastModified = null;
77-
Long totalBytesTransferred = null;
78-
TransferProgressSnapshot snapshot = progress.snapshot();
79-
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
80-
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
81-
s3objectLastModified = getObjectResponse.lastModified();
82-
totalBytesTransferred = getObjectResponse.contentLength();
83-
}
84-
File destination = request.destination().toFile();
85-
long length = destination.length();
86-
Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified());
87-
resumableFileDownload = ResumableFileDownload.builder()
88-
.downloadFileRequest(request)
89-
.s3ObjectLastModified(s3objectLastModified)
90-
.fileLastModified(fileLastModified)
91-
.bytesTransferred(length)
92-
.totalSizeInBytes(totalBytesTransferred)
93-
.build();
94-
}
95-
96-
}
97-
}
98-
return resumableFileDownload;
99-
}
100-
101-
@Override
102-
public CompletableFuture<CompletedFileDownload> completionFuture() {
103-
return completionFuture;
60+
return resumableFileDownload.getValue();
10461
}
10562

106-
@Override
107-
public boolean equals(Object o) {
108-
if (this == o) {
109-
return true;
110-
}
111-
if (o == null || getClass() != o.getClass()) {
112-
return false;
113-
}
114-
115-
DefaultFileDownload that = (DefaultFileDownload) o;
63+
private ResumableFileDownload doPause() {
64+
completionFuture.cancel(true);
11665

117-
if (!Objects.equals(completionFuture, that.completionFuture)) {
118-
return false;
119-
}
66+
Instant s3objectLastModified = null;
67+
Long totalSizeInBytes = null;
68+
TransferProgressSnapshot snapshot = progress.snapshot();
12069

121-
if (!Objects.equals(requestFuture, that.requestFuture)) {
122-
return false;
70+
if (snapshot.sdkResponse().isPresent() && snapshot.sdkResponse().get() instanceof GetObjectResponse) {
71+
GetObjectResponse getObjectResponse = (GetObjectResponse) snapshot.sdkResponse().get();
72+
s3objectLastModified = getObjectResponse.lastModified();
73+
totalSizeInBytes = getObjectResponse.contentLength();
74+
} else if (resumedDownload != null) {
75+
s3objectLastModified = resumedDownload.s3ObjectLastModified().orElse(null);
76+
totalSizeInBytes = resumedDownload.totalSizeInBytes().orElse(null);
12377
}
12478

125-
return Objects.equals(progressFuture, that.progressFuture);
79+
DownloadFileRequest request = requestSupplier.get();
80+
File destination = request.destination().toFile();
81+
long length = destination.length();
82+
Instant fileLastModified = Instant.ofEpochMilli(destination.lastModified());
83+
return ResumableFileDownload.builder()
84+
.downloadFileRequest(request)
85+
.s3ObjectLastModified(s3objectLastModified)
86+
.fileLastModified(fileLastModified)
87+
.bytesTransferred(length)
88+
.totalSizeInBytes(totalSizeInBytes)
89+
.build();
12690
}
12791

12892
@Override
129-
public int hashCode() {
130-
int result = completionFuture != null ? completionFuture.hashCode() : 0;
131-
result = 31 * result + (requestFuture != null ? requestFuture.hashCode() : 0);
132-
result = 31 * result + (progressFuture != null ? progressFuture.hashCode() : 0);
133-
return result;
93+
public CompletableFuture<CompletedFileDownload> completionFuture() {
94+
return completionFuture;
13495
}
13596

13697
@Override
13798
public String toString() {
13899
return ToString.builder("DefaultFileDownload")
139100
.add("completionFuture", completionFuture)
140-
.add("progress", progressFuture)
141-
.add("request", requestFuture)
101+
.add("progress", progress)
102+
.add("request", requestSupplier.get())
142103
.build();
143104
}
144105
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/DefaultTransferProgressSnapshot.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private DefaultTransferProgressSnapshot(Builder builder) {
4444
"bytesTransferred (%s) must not be greater than transferSizeInBytes (%s)",
4545
builder.bytesTransferred, builder.transferSizeInBytes);
4646
}
47+
Validate.paramNotNull(builder.bytesTransferred, "byteTransferred");
4748
this.bytesTransferred = Validate.isNotNegative(builder.bytesTransferred, "bytesTransferred");
4849
this.transferSizeInBytes = builder.transferSizeInBytes;
4950
this.sdkResponse = builder.sdkResponse;
@@ -125,7 +126,7 @@ public String toString() {
125126

126127

127128
public static final class Builder implements CopyableBuilder<Builder, DefaultTransferProgressSnapshot> {
128-
private long bytesTransferred = 0L;
129+
private Long bytesTransferred;
129130
private Long transferSizeInBytes;
130131
private SdkResponse sdkResponse;
131132

@@ -138,7 +139,7 @@ private Builder(DefaultTransferProgressSnapshot snapshot) {
138139
this.sdkResponse = snapshot.sdkResponse;
139140
}
140141

141-
public Builder bytesTransferred(long bytesTransferred) {
142+
public Builder bytesTransferred(Long bytesTransferred) {
142143
this.bytesTransferred = bytesTransferred;
143144
return this;
144145
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.transfer.s3.internal.progress;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
import software.amazon.awssdk.transfer.s3.progress.TransferProgress;
21+
import software.amazon.awssdk.transfer.s3.progress.TransferProgressSnapshot;
22+
import software.amazon.awssdk.utils.Validate;
23+
24+
/**
25+
* An implementation of {@link TransferProgress} used when resuming a transfer. This uses a bytes-transferred of 0 until the real
26+
* progress is available (when the transfer starts).
27+
*/
28+
@SdkInternalApi
29+
public class ResumeTransferProgress implements TransferProgress {
30+
private CompletableFuture<TransferProgress> progressFuture;
31+
32+
public ResumeTransferProgress(CompletableFuture<TransferProgress> progressFuture) {
33+
this.progressFuture = Validate.paramNotNull(progressFuture, "progressFuture");
34+
}
35+
36+
@Override
37+
public TransferProgressSnapshot snapshot() {
38+
if (progressFuture.isDone() && !progressFuture.isCompletedExceptionally()) {
39+
return progressFuture.join().snapshot();
40+
}
41+
return DefaultTransferProgressSnapshot.builder().bytesTransferred(0L).build();
42+
}
43+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class TransferProgressUpdater {
4646
public TransferProgressUpdater(TransferObjectRequest request,
4747
AsyncRequestBody requestBody) {
4848
DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder();
49+
snapshotBuilder.bytesTransferred(0L);
4950
getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::transferSizeInBytes);
5051
TransferProgressSnapshot snapshot = snapshotBuilder.build();
5152
progress = new DefaultTransferProgress(snapshot);
@@ -134,7 +135,7 @@ public void subscriberOnComplete() {
134135
}
135136

136137
private void resetBytesTransferred() {
137-
progress.updateAndGet(b -> b.bytesTransferred(0));
138+
progress.updateAndGet(b -> b.bytesTransferred(0L));
138139
}
139140

140141
private void incrementBytesTransferred(int numBytes) {

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/FileDownload.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public interface FileDownload extends ObjectTransfer {
3535
* The information object is serializable for persistent storage until it should be resumed.
3636
* See {@link ResumableFileDownload} for supported formats.
3737
*
38-
* @return {@link ResumableFileDownload} that can be used to resume the download
38+
* @return A {@link ResumableFileDownload} that can be used to resume the download.
3939
*/
4040
ResumableFileDownload pause();
4141

0 commit comments

Comments
 (0)