Skip to content

Commit 7da7812

Browse files
authored
fix: only trigger task status update after successful replicate status when relevant (#760)
1 parent 41312a4 commit 7da7812

File tree

4 files changed

+46
-24
lines changed

4 files changed

+46
-24
lines changed

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# x-release-please-start-version
22
version=9.0.0
33
# x-release-please-end
4-
iexecCommonsPocoVersion=5.0.0
5-
iexecCommonVersion=9.0.0
4+
iexecCommonsPocoVersion=5.1.0
5+
iexecCommonVersion=9.1.0
66
iexecBlockchainAdapterVersion=9.0.0
77
iexecResultVersion=9.0.0
88
iexecSmsVersion=9.0.0

src/main/java/com/iexec/core/replicate/listener/ReplicateListeners.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.springframework.context.event.EventListener;
2929
import org.springframework.stereotype.Component;
3030

31-
import static com.iexec.common.replicate.ReplicateStatus.FAILED;
31+
import java.util.List;
32+
33+
import static com.iexec.common.replicate.ReplicateStatus.*;
3234
import static com.iexec.common.replicate.ReplicateStatusCause.TASK_NOT_ACTIVE;
3335

3436
@Slf4j
@@ -51,23 +53,31 @@ public ReplicateListeners(final WorkerService workerService,
5153
}
5254

5355
@EventListener
54-
public void onReplicateUpdatedEvent(ReplicateUpdatedEvent event) {
55-
log.debug("Received ReplicateUpdatedEvent [chainTaskId:{}] ", event.getChainTaskId());
56+
public void onReplicateUpdatedEvent(final ReplicateUpdatedEvent event) {
57+
log.debug("Received ReplicateUpdatedEvent [chainTaskId:{}, workerAddress:{}, status:{}]",
58+
event.getChainTaskId(), event.getWalletAddress(), event.getReplicateStatusUpdate().getStatus());
5659
final ReplicateStatusUpdate statusUpdate = event.getReplicateStatusUpdate();
5760
final ReplicateStatus newStatus = statusUpdate.getStatus();
5861
final ReplicateStatusCause cause = statusUpdate.getDetails() != null ? statusUpdate.getDetails().getCause() : null;
5962

60-
taskUpdateRequestManager.publishRequest(event.getChainTaskId());
63+
// Those are the only transitions justifying to update a task status
64+
// When one worker updates its replicate status to STARTED, its task status can be updated to RUNNING
65+
// When one worker updates its replicate status to RESULT_UPLOADED, its task status can be updated to RESULT_UPLOADED
66+
// The other statuses denote major PoCo events allowing to see an update of the associated task status
67+
// The check against isFailedBeforeComputed allows to trigger the running2runningFailed check in TaskUpdateManager
68+
// This protection allows to avoid unnecessary loops in update manager as well as unnecessary on-chain calls
69+
if (List.of(STARTED, CONTRIBUTE_AND_FINALIZE_DONE, CONTRIBUTED, REVEALED, RESULT_UPLOADED).contains(newStatus)
70+
|| newStatus.isFailedBeforeComputed()) {
71+
taskUpdateRequestManager.publishRequest(event.getChainTaskId());
72+
}
6173

6274
/*
6375
* Should release 1 CPU of given worker for this replicate if status is
6476
* "COMPUTED" or "*_FAILED" before COMPUTED
6577
*/
66-
if (newStatus == ReplicateStatus.START_FAILED
67-
|| newStatus == ReplicateStatus.APP_DOWNLOAD_FAILED
68-
|| newStatus == ReplicateStatus.DATA_DOWNLOAD_FAILED
69-
|| newStatus == ReplicateStatus.COMPUTED
70-
|| newStatus == ReplicateStatus.COMPUTE_FAILED) {
78+
if (newStatus.isFailedBeforeComputed() || newStatus == ReplicateStatus.COMPUTED) {
79+
log.info("End of replicate computation detected [chainTaskId:{}, workerAddress:{}]",
80+
event.getChainTaskId(), event.getWalletAddress());
7181
workerService.removeComputedChainTaskIdFromWorker(event.getChainTaskId(), event.getWalletAddress());
7282
}
7383

src/main/java/com/iexec/core/task/update/TaskUpdateManager.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545
import java.util.Date;
4646
import java.util.List;
4747
import java.util.Optional;
48-
import java.util.function.Predicate;
4948

5049
import static com.iexec.core.task.TaskStatus.*;
5150

@@ -496,20 +495,19 @@ private void running2RunningFailed(final Task task, final ReplicatesList replica
496495
}
497496

498497
// If not all alive workers have failed while running the task, that's not a running failure.
499-
boolean notAllReplicatesFailed = replicatesOfAliveWorkers
498+
// FIXME use lambda when deprecated code has been removed in iexec-common
499+
final boolean allReplicatesFailed = replicatesOfAliveWorkers
500500
.stream()
501501
.map(Replicate::getLastRelevantStatus)
502-
.anyMatch(Predicate.not(ReplicateStatus::isFailedBeforeComputed));
503-
504-
if (notAllReplicatesFailed) {
505-
return;
506-
}
502+
.allMatch(replicateStatus -> replicateStatus.isFailedBeforeComputed());
507503

508504
// If all alive workers have failed on this task, its computation should be stopped.
509505
// It could denote that the task is wrong
510506
// - e.g. failing script, dataset can't be retrieved, app can't be downloaded, ...
511-
updateTaskStatusesAndSave(task, RUNNING_FAILED, FAILED);
512-
applicationEventPublisher.publishEvent(new TaskRunningFailedEvent(task.getChainTaskId()));
507+
if (allReplicatesFailed) {
508+
updateTaskStatusesAndSave(task, RUNNING_FAILED, FAILED);
509+
applicationEventPublisher.publishEvent(new TaskRunningFailedEvent(task.getChainTaskId()));
510+
}
513511
}
514512

515513
// endregion

src/test/java/com/iexec/core/replicate/listener/ReplicateListenersTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,33 @@ class ReplicateListenersTests {
5858
private ReplicatesService replicatesService;
5959
@Mock
6060
private TaskUpdateRequestManager taskUpdateRequestManager;
61-
6261
@InjectMocks
6362
private ReplicateListeners replicateListeners;
6463

64+
private final List<ReplicateStatus> statusesTriggeringTaskUpdate = List.of(
65+
STARTED, CONTRIBUTE_AND_FINALIZE_DONE, CONTRIBUTED, REVEALED, RESULT_UPLOADED,
66+
START_FAILED, APP_DOWNLOAD_FAILED, DATA_DOWNLOAD_FAILED, COMPUTE_FAILED);
67+
6568
@Test
6669
void shouldUpdateTaskOnReplicateUpdate() {
67-
final List<ReplicateStatus> someStatuses = ReplicateStatus.getSuccessStatuses(); //not exhaustive
70+
statusesTriggeringTaskUpdate.stream()
71+
.map(this::getMockReplicate)
72+
.forEach(replicateListeners::onReplicateUpdatedEvent);
73+
74+
verify(taskUpdateRequestManager, times(statusesTriggeringTaskUpdate.size())).publishRequest(any());
75+
}
76+
77+
@Test
78+
void shouldNotUpdateTaskOnReplicateUpdate() {
79+
final List<ReplicateStatus> nonTriggeringStatuses = Arrays.stream(ReplicateStatus.values())
80+
.filter(status -> !statusesTriggeringTaskUpdate.contains(status))
81+
.toList();
6882

69-
someStatuses.stream()
83+
nonTriggeringStatuses.stream()
7084
.map(this::getMockReplicate)
7185
.forEach(replicateListeners::onReplicateUpdatedEvent);
7286

73-
verify(taskUpdateRequestManager, times(someStatuses.size())).publishRequest(any());
87+
verifyNoInteractions(taskUpdateRequestManager);
7488
}
7589

7690
@Test

0 commit comments

Comments
 (0)