Skip to content

feat(S3): implements suspend and resume features for multipart upload tasks #4168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Brennan Stehling authored and brennanMKE committed Jun 4, 2022
commit 374a93afb5e4aab6416ee9bd040ed04c05aba695
34 changes: 16 additions & 18 deletions AWSS3/AWSS3TransferUtility.m
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,22 @@ - (void) hydrateFromDB:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary
}
}

- (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary
tempTransferDictionary: (NSMutableDictionary *) tempTransferDictionary
completionHandler: (void (^)(NSError *_Nullable error)) completionHandler{
- (void)linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMasterTaskDictionary
tempTransferDictionary:(NSMutableDictionary *)tempTransferDictionary
completionHandler:(void (^)(NSError *_Nullable error)) completionHandler {
//Get tasks from the NSURLSession and reattach to them.
//getTasksWithCompletionHandler is an ansynchronous task, so the thread that is calling this method will not be blocked.
[self.session getTasksWithCompletionHandler:^(NSArray *dataTasks, NSArray *uploadTasks, NSArray *downloadTasks) {

//Loop through all the upload Tasks.
for( NSURLSessionUploadTask *task in uploadTasks) {
for (NSURLSessionUploadTask *task in uploadTasks) {
AWSDDLogDebug(@"Iterating through task Identifier [%lu]", (unsigned long)task.taskIdentifier);
NSError *taskError = [task error];

//Get the Task
id obj = [tempTransferDictionary objectForKey:@(task.taskIdentifier)];

if ([obj isKindOfClass:[AWSS3TransferUtilityUploadTask class]])
{

if ([obj isKindOfClass:[AWSS3TransferUtilityUploadTask class]]) {
//Found a upload task.
AWSS3TransferUtilityUploadTask *uploadTask = obj;
uploadTask.sessionTask = task;
Expand All @@ -469,11 +468,11 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster
filePath:uploadTask.file];
continue;
}

//Check if it is InProgress
if (uploadTask.status == AWSS3TransferUtilityTransferStatusInProgress) {
//Check if the the underlying NSURLSession task is completed. If so, delete the record from the DB, clean up any temp files and call the completion handler.
if ([task state] == NSURLSessionTaskStateCompleted) {
if (task.state == NSURLSessionTaskStateCompleted) {
//Set progress to 100%
uploadTask.progress.completedUnitCount = uploadTask.progress.totalUnitCount;
uploadTask.status = AWSS3TransferUtilityTransferStatusCompleted;
Expand All @@ -484,7 +483,7 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster
continue;
}
//If it is in any other status than running, then we need to recover by retrying.
if ([task state] != NSURLSessionTaskStateRunning) {
if (task.state != NSURLSessionTaskStateRunning) {
//We think the task in IN_PROGRESS. The underlying task is not running.
//Recover the situation by retrying.
[self retryUpload:uploadTask];
Expand Down Expand Up @@ -522,15 +521,15 @@ - (void) linkTransfersToNSURLSession:(NSMutableDictionary *) tempMultiPartMaster
//Check if this is in progress
if (downloadTask.status == AWSS3TransferUtilityTransferStatusInProgress) {

if ([task state] == NSURLSessionTaskStateCompleted) {
if (task.state == NSURLSessionTaskStateCompleted) {
//Set progress to 100%
downloadTask.progress.completedUnitCount = downloadTask.progress.totalUnitCount;
downloadTask.status = AWSS3TransferUtilityTransferStatusCompleted;
[self markTransferAsCompleted:downloadTask taskError:taskError temporaryFileCreated:NO filePath:@""];
continue;
}
//Check if the underlying task's status is not in Progress.
else if ([task state] != NSURLSessionTaskStateRunning) {
else if (task.state != NSURLSessionTaskStateRunning) {
//We think the task in Progress. The underlying task is not in progress.
//Recover the situation by retrying
[self retryDownload:downloadTask];
Expand Down Expand Up @@ -716,10 +715,9 @@ - (AWSS3TransferUtilityDownloadTask *)hydrateDownloadTask:(NSMutableDictionary *
}


-( AWSS3TransferUtilityMultiPartUploadTask *) hydrateMultiPartUploadTask: (NSMutableDictionary *) task
sessionIdentifier: (NSString *) sessionIdentifier
databaseQueue: (AWSFMDatabaseQueue *) databaseQueue
{
- (AWSS3TransferUtilityMultiPartUploadTask *)hydrateMultiPartUploadTask: (NSMutableDictionary *) task
sessionIdentifier:(NSString *) sessionIdentifier
databaseQueue: (AWSFMDatabaseQueue *) databaseQueue {
AWSS3TransferUtilityMultiPartUploadTask *transferUtilityMultiPartUploadTask = [AWSS3TransferUtilityMultiPartUploadTask new];
[transferUtilityMultiPartUploadTask integrateWithTransferUtility:self];
transferUtilityMultiPartUploadTask.nsURLSessionID = sessionIdentifier;
Expand Down
15 changes: 8 additions & 7 deletions AWSS3/AWSS3TransferUtilityTasks.m
Original file line number Diff line number Diff line change
Expand Up @@ -256,14 +256,15 @@ - (void)suspend {
//

// Cancel session task for all subtasks which are in progress and set status to paused
for (NSNumber *taskIdentifier in self.inProgressPartsDictionary.allKeys) {
AWSS3TransferUtilityUploadSubTask *inProgressSubTask = self.inProgressPartsDictionary[taskIdentifier];
for (AWSS3TransferUtilityUploadSubTask *inProgressSubTask in self.inProgressTasks) {
// Note: This can happen due to lack of thread-safety
if (!inProgressSubTask) {
NSCAssert(NO, @"Sub Task should not be nil!");
continue;
}

// cancel the URLSessionTask
inProgressSubTask.status = AWSS3TransferUtilityTransferStatusPaused;
[inProgressSubTask.sessionTask cancel];

AWSS3TransferUtilityUploadSubTask *subTask = [AWSS3TransferUtilityUploadSubTask new];
Expand All @@ -286,7 +287,7 @@ - (void)suspend {
AWSDDLogError(@"Error creating AWSS3TransferUtilityUploadSubTask [%@]", error);
self.error = error;
} else {
self.inProgressPartsDictionary[taskIdentifier] = nil;
self.inProgressPartsDictionary[@(inProgressSubTask.taskIdentifier)] = nil;

[AWSS3TransferUtilityDatabaseHelper updateTransferRequestInDB:inProgressSubTask.transferID partNumber:inProgressSubTask.partNumber taskIdentifier:inProgressSubTask.taskIdentifier eTag:nil status:AWSS3TransferUtilityTransferStatusCancelled retry_count:0 databaseQueue:self.databaseQueue];

Expand Down Expand Up @@ -367,8 +368,8 @@ - (void)moveInProgressAndSuspendedTasks {
}

for (AWSS3TransferUtilityUploadSubTask *aSubTask in inProgressAndSuspendedTasks) {
[self.inProgressPartsDictionary removeObjectForKey:@(aSubTask.taskIdentifier)];
[self.waitingPartsDictionary setObject:aSubTask forKey:@(aSubTask.taskIdentifier)];
self.inProgressPartsDictionary[@(aSubTask.taskIdentifier)] = nil;
self.waitingPartsDictionary[@(aSubTask.taskIdentifier)] = aSubTask;
}
}

Expand All @@ -383,7 +384,7 @@ - (void)moveWaitingTasksToInProgress:(BOOL)startTransfer {
AWSS3TransferUtilityUploadSubTask *nextSubTask = [[self.waitingPartsDictionary allValues] objectAtIndex:0];

//Add to inProgress list
[self.inProgressPartsDictionary setObject:nextSubTask forKey:@(nextSubTask.taskIdentifier)];
self.inProgressPartsDictionary[@(nextSubTask.taskIdentifier)] = nextSubTask;

//Remove it from the waitingList
[self.waitingPartsDictionary removeObjectForKey:@(nextSubTask.taskIdentifier)];
Expand All @@ -402,7 +403,7 @@ - (void)completeUploadSubTask:(AWSS3TransferUtilityUploadSubTask *)subTask

//Add it to completed parts and remove it from remaining parts.
[self.completedPartsSet addObject:subTask];
[self.inProgressPartsDictionary removeObjectForKey:@(subTask.taskIdentifier)];
self.inProgressPartsDictionary[@(subTask.taskIdentifier)] = nil;
//Update progress
self.progress.completedUnitCount = self.progress.completedUnitCount - subTask.totalBytesSent + subTask.totalBytesExpectedToSend;

Expand Down