Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ All notable changes to this project will be documented in this file.
- Fix web security depreciation warning. (#624)
- Move `TaskModel` and `ReplicateModel` instances creation methods to `Task` and `Replicate` classes. (#625)
- Expose `TaskLogsModel` on `TaskController` instead of `TaskLogs`. (#631)
- Remove duplicated MongoDB read on `ReplicatesList` during replicate status update. (#647)

### Dependency Upgrades

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/iexec/core/replicate/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusModifier m
}

public boolean updateStatus(ReplicateStatus newStatus, ReplicateStatusCause cause,
ReplicateStatusModifier modifier, ChainReceipt chainReceipt) {
ReplicateStatusModifier modifier, ChainReceipt chainReceipt) {
ReplicateStatusDetails details = ReplicateStatusDetails.builder()
.chainReceipt(chainReceipt)
.cause(cause)
Expand Down
54 changes: 27 additions & 27 deletions src/main/java/com/iexec/core/replicate/ReplicatesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@
@Service
public class ReplicatesService {

private ReplicatesRepository replicatesRepository;
private IexecHubService iexecHubService;
private ApplicationEventPublisher applicationEventPublisher;
private Web3jService web3jService;
private ResultService resultService;
private TaskLogsService taskLogsService;
private final ReplicatesRepository replicatesRepository;
private final IexecHubService iexecHubService;
private final ApplicationEventPublisher applicationEventPublisher;
private final Web3jService web3jService;
private final ResultService resultService;
private final TaskLogsService taskLogsService;

private final ContextualLockRunner<String> replicatesUpdateLockRunner =
new ContextualLockRunner<>(10, TimeUnit.MINUTES);
Expand Down Expand Up @@ -165,20 +165,12 @@ public Optional<Replicate> getReplicateWithResultUploadedStatus(String chainTask
* @return {@link ReplicateStatusUpdateError#NO_ERROR} if this update is OK,
* another {@link ReplicateStatusUpdateError} containing the error reason otherwise.
*/
public ReplicateStatusUpdateError canUpdateReplicateStatus(String chainTaskId,
String walletAddress,
public ReplicateStatusUpdateError canUpdateReplicateStatus(Replicate replicate,
ReplicateStatusUpdate statusUpdate,
UpdateReplicateStatusArgs updateReplicateStatusArgs) {
Optional<ReplicatesList> oReplicateList = getReplicatesList(chainTaskId);
if (oReplicateList.isEmpty() || oReplicateList.get().getReplicateOfWorker(walletAddress).isEmpty()) {
log.error("Cannot update replicate, could not get replicate [chainTaskId:{}, UpdateRequest:{}]",
chainTaskId, statusUpdate);
return ReplicateStatusUpdateError.UNKNOWN_REPLICATE;
}

ReplicatesList replicatesList = oReplicateList.get();
Replicate replicate = replicatesList.getReplicateOfWorker(walletAddress).orElseThrow(); // "get" could be used there but triggers a warning
ReplicateStatus newStatus = statusUpdate.getStatus();
final String chainTaskId = replicate.getChainTaskId();
final String walletAddress = replicate.getWalletAddress();
final ReplicateStatus newStatus = statusUpdate.getStatus();

boolean hasAlreadyTransitionedToStatus = replicate.containsStatus(newStatus);
if (hasAlreadyTransitionedToStatus) {
Expand Down Expand Up @@ -418,28 +410,36 @@ Either<ReplicateStatusUpdateError, TaskNotificationType> updateReplicateStatusWi
log.info("Replicate update request [status:{}, chainTaskId:{}, walletAddress:{}, details:{}]",
statusUpdate.getStatus(), chainTaskId, walletAddress, statusUpdate.getDetailsWithoutLogs());

final ReplicateStatusUpdateError error = canUpdateReplicateStatus(chainTaskId, walletAddress, statusUpdate, updateReplicateStatusArgs);
final Optional<ReplicatesList> oReplicatesList = getReplicatesList(chainTaskId);
final Optional<Replicate> oReplicate = oReplicatesList
.flatMap(replicatesList -> replicatesList.getReplicateOfWorker(walletAddress));
if (oReplicatesList.isEmpty() || oReplicate.isEmpty()) {
log.error("Cannot update replicate, could not get replicate [chainTaskId:{}, UpdateRequest:{}]",
chainTaskId, statusUpdate);
return Either.left(ReplicateStatusUpdateError.UNKNOWN_REPLICATE);
}
final ReplicatesList replicatesList = oReplicatesList.get();
final Replicate replicate = oReplicate.get();
final ReplicateStatus newStatus = statusUpdate.getStatus();

final ReplicateStatusUpdateError error = canUpdateReplicateStatus(replicate, statusUpdate, updateReplicateStatusArgs);
if (ReplicateStatusUpdateError.NO_ERROR != error) {
return Either.left(error);
}

ReplicatesList replicatesList = getReplicatesList(chainTaskId).orElseThrow(); // "get" could be used there but triggers a warning
Replicate replicate = replicatesList.getReplicateOfWorker(walletAddress).orElseThrow(); // "get" could be used there but triggers a warning
ReplicateStatus newStatus = statusUpdate.getStatus();

if (newStatus.equals(CONTRIBUTED)) {
if (newStatus == CONTRIBUTED) {
replicate.setContributionHash(updateReplicateStatusArgs.getChainContribution().getResultHash());
replicate.setWorkerWeight(updateReplicateStatusArgs.getWorkerWeight());
}

if (newStatus.equals(RESULT_UPLOADED)) {
if (newStatus == RESULT_UPLOADED) {
replicate.setResultLink(updateReplicateStatusArgs.getResultLink());
replicate.setChainCallbackData(updateReplicateStatusArgs.getChainCallbackData());
}

if (statusUpdate.getDetails() != null &&
(newStatus.equals(COMPUTED) || (newStatus.equals(COMPUTE_FAILED)
&& ReplicateStatusCause.APP_COMPUTE_FAILED.equals(statusUpdate.getDetails().getCause())))) {
(newStatus == COMPUTED || (newStatus == COMPUTE_FAILED
&& ReplicateStatusCause.APP_COMPUTE_FAILED == statusUpdate.getDetails().getCause()))) {
final ComputeLogs computeLogs = statusUpdate.getDetails().tailLogs().getComputeLogs();
taskLogsService.addComputeLogs(chainTaskId, computeLogs);
statusUpdate.getDetails().setComputeLogs(null);//using null here to keep light replicate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ReplicateControllerTests {
private static final ReplicateTaskSummary REPLICATE_TASK_SUMMARY = ReplicateTaskSummary.builder()
.workerpoolAuthorization(AUTH)
.smsUrl(SMS_URL)
.build();
.build();
private static final ReplicateStatusUpdate UPDATE = ReplicateStatusUpdate.builder()
.status(ReplicateStatus.STARTED)
.build();
Expand Down Expand Up @@ -229,11 +229,11 @@ void shouldUpdateReplicate() {
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE))
.thenReturn(UPDATE_ARGS);
when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
when(replicatesService.canUpdateReplicateStatus(new Replicate(CHAIN_TASK_ID, WALLET_ADDRESS), UPDATE, UPDATE_ARGS))
.thenReturn(ReplicateStatusUpdateError.NO_ERROR);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
.thenReturn(Either.right(TaskNotificationType.PLEASE_DOWNLOAD_APP));

ResponseEntity<TaskNotificationType> response =
replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE);

Expand All @@ -258,7 +258,7 @@ void shouldUpdateReplicateAndSetWalletAddress() {
.thenReturn(true);
when(replicatesService.computeUpdateReplicateStatusArgs(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs))
.thenReturn(UPDATE_ARGS);
when(replicatesService.canUpdateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs, UPDATE_ARGS))
when(replicatesService.canUpdateReplicateStatus(new Replicate(CHAIN_TASK_ID, WALLET_ADDRESS), updateWithLogs, UPDATE_ARGS))
.thenReturn(ReplicateStatusUpdateError.NO_ERROR);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, updateWithLogs, UPDATE_ARGS))
.thenReturn(Either.right((TaskNotificationType.PLEASE_DOWNLOAD_APP)));
Expand All @@ -279,7 +279,7 @@ void shouldUpdateReplicateAndSetWalletAddress() {
void shouldNotUpdateReplicateSinceUnauthorized() {
when(jwtTokenProvider.getWalletAddressFromBearerToken(TOKEN))
.thenReturn("");

ResponseEntity<TaskNotificationType> response =
replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE);

Expand Down Expand Up @@ -321,7 +321,7 @@ void shouldReturnPleaseAbortSinceCantUpdate(ReplicateStatusUpdateError error) {
.thenReturn(UPDATE_ARGS);
when(replicatesService.updateReplicateStatus(CHAIN_TASK_ID, WALLET_ADDRESS, UPDATE, UPDATE_ARGS))
.thenReturn(Either.left(error));

ResponseEntity<TaskNotificationType> response =
replicatesController.updateReplicateStatus(TOKEN, CHAIN_TASK_ID, UPDATE);

Expand Down
Loading