Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.

## [[NEXT]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/vNEXT) 2024

### Bug Fixes

- Keep a single `updateReplicateStatus` method in `ReplicatesService`. (#670)

## [[8.4.0]](https://github.com/iExecBlockchainComputing/iexec-core/releases/tag/v8.4.0) 2024-02-29

### New Features
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/com/iexec/core/detector/WorkerLostDetector.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableSet;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.core.replicate.ReplicatesService;
import com.iexec.core.task.TaskService;
import com.iexec.core.worker.Worker;
Expand Down Expand Up @@ -69,7 +70,7 @@ public void detect() {
replicatesService.updateReplicateStatus(
chainTaskId,
workerWallet,
WORKER_LOST
ReplicateStatusUpdate.poolManagerRequest(WORKER_LOST)
);
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package com.iexec.core.detector.replicate;

import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.core.detector.Detector;
import com.iexec.core.replicate.Replicate;
import com.iexec.core.replicate.ReplicatesService;
Expand Down Expand Up @@ -84,8 +85,8 @@ public void detect() {
log.info("detected replicate with resultUploadTimeout [chainTaskId:{}, replicate:{}, currentStatus:{}]",
chainTaskId, uploadingReplicate.getWalletAddress(), uploadingReplicate.getCurrentStatus());

replicatesService.updateReplicateStatus(chainTaskId, uploadingReplicate.getWalletAddress(),
RESULT_UPLOAD_FAILED);
replicatesService.updateReplicateStatus(
chainTaskId, uploadingReplicate.getWalletAddress(), ReplicateStatusUpdate.poolManagerRequest(RESULT_UPLOAD_FAILED));
taskUpdateRequestManager.publishRequest(task.getChainTaskId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusDetails;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.commons.poco.chain.ChainContributionStatus;
import com.iexec.commons.poco.chain.ChainReceipt;
import com.iexec.core.chain.IexecHubService;
Expand Down Expand Up @@ -185,7 +186,8 @@ private void updateReplicateStatuses(Task task, Replicate replicate) {
// by default, no need to retrieve anything
break;
}
replicatesService.updateReplicateStatus(chainTaskId, wallet, statusToUpdate, details);
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(statusToUpdate, details);
replicatesService.updateReplicateStatus(chainTaskId, wallet, statusUpdate);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -207,8 +207,8 @@ private boolean acceptOrRejectTask(Task task, String walletAddress) {
* tries to accept the task - i.e. create a new {@link Replicate}
* for that task on that worker.
*
* @param task {@link Task} needing at least one new {@link Replicate}.
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
* @param task {@link Task} needing at least one new {@link Replicate}.
* @param walletAddress Wallet address of a worker looking for new {@link Task}.
* @param replicatesList Replicates of given {@link Task}.
* @return {@literal true} if the task has been accepted,
* {@literal false} otherwise.
Expand Down Expand Up @@ -364,7 +364,8 @@ private Optional<TaskNotificationType> recoverReplicateInContributionPhase(Task

if (didReplicateStartContributing && didReplicateContributeOnChain) {
ReplicateStatusDetails details = new ReplicateStatusDetails(blockNumber);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, CONTRIBUTED, details);
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(CONTRIBUTED, details);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
}

// we read the replicate from db to consider the changes added in the previous case
Expand Down Expand Up @@ -419,7 +420,8 @@ private Optional<TaskNotificationType> recoverReplicateInRevealPhase(Task task,

if (didReplicateStartRevealing && didReplicateRevealOnChain) {
ReplicateStatusDetails details = new ReplicateStatusDetails(blockNumber);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, REVEALED, details);
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(REVEALED, details);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
taskUpdateRequestManager.publishRequest(chainTaskId);
}

Expand Down Expand Up @@ -474,7 +476,8 @@ private Optional<TaskNotificationType> recoverReplicateInResultUploadPhase(Task
}

if (didReplicateStartUploading && didReplicateUploadWithoutNotifying) {
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, RESULT_UPLOADED);
final ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(RESULT_UPLOADED);
replicatesService.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);

taskUpdateRequestManager.publishRequest(chainTaskId);
return Optional.of(TaskNotificationType.PLEASE_WAIT);
Expand Down
19 changes: 7 additions & 12 deletions src/main/java/com/iexec/core/replicate/ReplicatesController.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,8 +58,8 @@ public ReplicatesController(ReplicatesService replicatesService,

@GetMapping("/replicates/available")
public ResponseEntity<ReplicateTaskSummary> getAvailableReplicateTaskSummary(
@RequestParam(name = "blockNumber") long blockNumber,
@RequestHeader("Authorization") String bearerToken) {
@RequestParam(name = "blockNumber") long blockNumber,
@RequestHeader("Authorization") String bearerToken) {
String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
if (workerWalletAddress.isEmpty()) {
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
Expand Down Expand Up @@ -106,8 +106,8 @@ public ResponseEntity<List<TaskNotification>> getMissedTaskNotifications(
* To avoid body deserialization from a {@link FeignException}, a notification with a non-null body
* is sent with a 2XX HTTP status code.
*
* @param bearerToken Authentication token of a worker.
* @param chainTaskId ID of the task on which the worker has an update.
* @param bearerToken Authentication token of a worker.
* @param chainTaskId ID of the task on which the worker has an update.
* @param statusUpdate Status update sent by the worker.
* @return A notification to the worker. A notification is implemented in {@code TaskNotificationType}.
*/
Expand Down Expand Up @@ -142,13 +142,8 @@ public ResponseEntity<TaskNotificationType> updateReplicateStatus(
}
}

final UpdateReplicateStatusArgs updateReplicateStatusArgs = replicatesService.computeUpdateReplicateStatusArgs(
chainTaskId,
walletAddress,
statusUpdate);

final Either<ReplicateStatusUpdateError, TaskNotificationType> updateResult = replicatesService
.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate, updateReplicateStatusArgs);
.updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
if (updateResult.isRight()) {
return ResponseEntity.ok(updateResult.get());
}
Expand All @@ -159,7 +154,7 @@ public ResponseEntity<TaskNotificationType> updateReplicateStatus(
.body(TaskNotificationType.PLEASE_WAIT);
case NO_ERROR:
log.warn("An error has been detected on replicate update but no error is returned" +
" [chainTaskId:{}, statusUpdate:{}]", chainTaskId, statusUpdate);
" [chainTaskId:{}, statusUpdate:{}]", chainTaskId, statusUpdate);
return ResponseEntity.internalServerError().build();
case UNKNOWN_REPLICATE:
case UNKNOWN_TASK:
Expand Down
70 changes: 6 additions & 64 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -267,50 +267,6 @@ public UpdateReplicateStatusArgs computeUpdateReplicateStatusArgs(String chainTa
.build();
}

/*
* This implicitly sets the modifier to POOL_MANAGER
*
* @Retryable is needed as it isn't triggered by a call from within the class itself.
*/
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus) {
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(newStatus);
updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
}

@Recover
public void updateReplicateStatus(OptimisticLockingFailureException exception,
String chainTaskId,
String walletAddress,
ReplicateStatus newStatus) {
logUpdateReplicateStatusRecover(exception);
}

/*
* This implicitly sets the modifier to POOL_MANAGER
*
* @Retryable is needed as it isn't triggered by a call from within the class itself.
*/
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public void updateReplicateStatus(String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusDetails details) {
ReplicateStatusUpdate statusUpdate = ReplicateStatusUpdate.poolManagerRequest(newStatus, details);
updateReplicateStatus(chainTaskId, walletAddress, statusUpdate);
}

@Recover
public void updateReplicateStatus(OptimisticLockingFailureException exception,
String chainTaskId,
String walletAddress,
ReplicateStatus newStatus,
ReplicateStatusDetails details) {
logUpdateReplicateStatusRecover(exception);
}

@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatus(
String chainTaskId,
Expand All @@ -334,7 +290,9 @@ public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateS
String chainTaskId,
String walletAddress,
ReplicateStatusUpdate statusUpdate) {
logUpdateReplicateStatusRecover(exception);
final String details = String.format("[chainTaskId:%s, walletAddress:%s, statusUpdate:%s]",
chainTaskId, walletAddress, statusUpdate);
log.error("Could not update replicate status, maximum number of retries reached {}", details, exception);
return null;
}

Expand Down Expand Up @@ -362,8 +320,7 @@ public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateS
* @return Either a {@link ReplicateStatusUpdateError} if the status can't be updated,
* or a next action for the worker.
*/
@Retryable(value = {OptimisticLockingFailureException.class}, maxAttempts = 100)
public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatus(
Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatus(
String chainTaskId,
String walletAddress,
ReplicateStatusUpdate statusUpdate,
Expand All @@ -377,23 +334,12 @@ public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateS
);
}

@Recover
public Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatus(
OptimisticLockingFailureException exception,
String chainTaskId,
String walletAddress,
ReplicateStatusUpdate statusUpdate,
UpdateReplicateStatusArgs updateReplicateStatusArgs) {
logUpdateReplicateStatusRecover(exception);
return null;
}

/**
* This method updates a replicate but does not care about thread safety.
* A single replicate can then be updated twice at the same time
* and completely break a task.
* This method has to be used with a synchronization mechanism, e.g.
* {@link ReplicatesService#updateReplicateStatus(String, String, ReplicateStatus, ReplicateStatusDetails)}
* {@link ReplicatesService#updateReplicateStatus(String, String, ReplicateStatusUpdate, UpdateReplicateStatusArgs)}
*
* @param chainTaskId Chain task id of the task whose replicate should be updated.
* @param walletAddress Wallet address of the worker whose replicate should be updated.
Expand Down Expand Up @@ -466,10 +412,6 @@ Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatusWi
return Either.right(nextAction);
}

private void logUpdateReplicateStatusRecover(OptimisticLockingFailureException exception) {
log.error("Could not update replicate status, maximum number of retries reached", exception);
}

private boolean canUpdateToBlockchainSuccess(String chainTaskId,
Replicate replicate,
ReplicateStatusUpdate statusUpdate,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import static com.iexec.common.replicate.ReplicateStatus.FAILED;
import static com.iexec.common.replicate.ReplicateStatusCause.TASK_NOT_ACTIVE;

@Slf4j
Expand Down Expand Up @@ -86,7 +87,7 @@ public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
* */
if (ReplicateStatus.getUncompletableStatuses().contains(newStatus)) {
replicatesService.updateReplicateStatus(event.getChainTaskId(),
event.getWalletAddress(), ReplicateStatus.FAILED);
event.getWalletAddress(), ReplicateStatusUpdate.poolManagerRequest(FAILED));
}

/*
Expand Down
10 changes: 6 additions & 4 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 IEXEC BLOCKCHAIN TECH
* Copyright 2021-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import com.iexec.blockchain.api.BlockchainAdapterService;
import com.iexec.common.replicate.ReplicateStatus;
import com.iexec.common.replicate.ReplicateStatusUpdate;
import com.iexec.commons.poco.chain.ChainReceipt;
import com.iexec.commons.poco.chain.ChainTask;
import com.iexec.commons.poco.chain.ChainTaskStatus;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.iexec.common.replicate.ReplicateStatus.RESULT_UPLOAD_REQUESTED;
import static com.iexec.core.task.TaskStatus.*;

@Service
Expand Down Expand Up @@ -631,7 +633,7 @@ void requestUpload(Task task) {
boolean isThereAWorkerUploading = replicatesService
.getNbReplicatesWithCurrentStatus(task.getChainTaskId(),
ReplicateStatus.RESULT_UPLOADING,
ReplicateStatus.RESULT_UPLOAD_REQUESTED) > 0;
RESULT_UPLOAD_REQUESTED) > 0;

if (isThereAWorkerUploading) {
log.info("Upload is requested but an upload is already in process. [chainTaskId: {}]", task.getChainTaskId());
Expand All @@ -646,8 +648,8 @@ void requestUpload(Task task) {
task.setUploadingWorkerWalletAddress(replicate.getWalletAddress());
taskService.updateTask(task.getChainTaskId(), Update.update("uploadingWorkerWalletAddress", replicate.getWalletAddress()));
updateTaskStatusAndSave(task, RESULT_UPLOADING);
replicatesService.updateReplicateStatus(task.getChainTaskId(), replicate.getWalletAddress(),
ReplicateStatus.RESULT_UPLOAD_REQUESTED);
replicatesService.updateReplicateStatus(
task.getChainTaskId(), replicate.getWalletAddress(), ReplicateStatusUpdate.poolManagerRequest(RESULT_UPLOAD_REQUESTED));

applicationEventPublisher.publishEvent(new PleaseUploadEvent(task.getChainTaskId(), replicate.getWalletAddress()));
}
Expand Down
Loading