Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ All notable changes to this project will be documented in this file.
- Add prometheus endpoint with custom metrics. (#632)
- Expose version through prometheus endpoint. (#637, #639)
- Stop fetching completed tasks count from DB. (#638)
- Expose current task statuses count to Prometheus. (#640)
- Expose current task statuses count to Prometheus and `/metrics` endpoint. (#640, #654)
- Add `tasks` endpoints to `iexec-core-library`. (#645)

### Quality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.iexec.core.task.TaskStatus;
import lombok.Builder;
import lombok.Value;

import java.math.BigInteger;
import java.util.LinkedHashMap;

@Value
@Builder
Expand All @@ -32,7 +34,7 @@ public class PlatformMetric {
int aliveAvailableCpu;
int aliveTotalGpu;
int aliveAvailableGpu;
long completedTasks;
LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
long dealEventsCount;
long dealsCount;
long replayDealsCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iexec.core.task.TaskStatus;
import org.junit.jupiter.api.Test;

import java.math.BigInteger;
import java.util.LinkedHashMap;

import static org.junit.jupiter.api.Assertions.assertEquals;

Expand All @@ -35,11 +37,38 @@ void shouldSerializeAndDeserialize() throws JsonProcessingException {
.aliveAvailableCpu(7)
.aliveTotalGpu(0)
.aliveAvailableGpu(0)
.completedTasks(1000)
.currentTaskStatusesCount(createCurrentTaskStatusesCount())
.dealEventsCount(3000)
.dealsCount(1100)
.latestBlockNumberWithDeal(BigInteger.valueOf(1_000_000L))
.build();
assertEquals(platformMetric, mapper.readValue(mapper.writeValueAsString(platformMetric), PlatformMetric.class));
}

private LinkedHashMap<TaskStatus, Long> createCurrentTaskStatusesCount() {
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = new LinkedHashMap<>(TaskStatus.values().length);
expectedCurrentTaskStatusesCount.put(TaskStatus.RECEIVED, 1L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZING, 2L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZED, 3L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZE_FAILED, 4L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING, 5L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING_FAILED, 6L);
expectedCurrentTaskStatusesCount.put(TaskStatus.CONTRIBUTION_TIMEOUT, 7L);
expectedCurrentTaskStatusesCount.put(TaskStatus.CONSENSUS_REACHED, 8L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENING, 9L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENED, 10L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPEN_FAILED, 11L);
expectedCurrentTaskStatusesCount.put(TaskStatus.AT_LEAST_ONE_REVEALED, 12L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADING, 13L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADED, 14L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOAD_TIMEOUT, 15L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZING, 16L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZED, 17L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZE_FAILED, 18L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINAL_DEADLINE_REACHED, 19L);
expectedCurrentTaskStatusesCount.put(TaskStatus.COMPLETED, 20L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FAILED, 21L);

return expectedCurrentTaskStatusesCount;
}
}
21 changes: 15 additions & 6 deletions src/main/java/com/iexec/core/metric/MetricService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@
package com.iexec.core.metric;

import com.iexec.core.chain.DealWatcherService;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
import com.iexec.core.worker.WorkerService;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;

import java.util.LinkedHashMap;

@Service
public class MetricService {
private final DealWatcherService dealWatcherService;
private final WorkerService workerService;
private final TaskService taskService;
private LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;

public MetricService(DealWatcherService dealWatcherService,
WorkerService workerService,
TaskService taskService) {
WorkerService workerService) {
this.dealWatcherService = dealWatcherService;
this.workerService = workerService;
this.taskService = taskService;

this.currentTaskStatusesCount = new LinkedHashMap<>();
}

public PlatformMetric getPlatformMetrics() {
Expand All @@ -42,11 +46,16 @@ public PlatformMetric getPlatformMetrics() {
.aliveAvailableCpu(workerService.getAliveAvailableCpu())
.aliveTotalGpu(workerService.getAliveTotalGpu())
.aliveAvailableGpu(workerService.getAliveAvailableGpu())
.completedTasks(taskService.getCompletedTasksCount())
.currentTaskStatusesCount(currentTaskStatusesCount)
.dealEventsCount(dealWatcherService.getDealEventsCount())
.dealsCount(dealWatcherService.getDealsCount())
.replayDealsCount(dealWatcherService.getReplayDealsCount())
.latestBlockNumberWithDeal(dealWatcherService.getLatestBlockNumberWithDeal())
.build();
}

@EventListener
void onTaskStatusesCountUpdateEvent(TaskStatusesCountUpdatedEvent event) {
this.currentTaskStatusesCount = event.getCurrentTaskStatusesCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024-2024 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.iexec.core.task.event;

import com.iexec.core.task.TaskStatus;
import lombok.Value;

import java.util.LinkedHashMap;

@Value
public class TaskStatusesCountUpdatedEvent {
LinkedHashMap<TaskStatus, Long> currentTaskStatusesCount;
}
51 changes: 38 additions & 13 deletions src/main/java/com/iexec/core/task/update/TaskUpdateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ class TaskUpdateManager {
private final BlockchainAdapterService blockchainAdapterService;
private final SmsService smsService;

private final Map<TaskStatus, AtomicLong> currentTaskStatusesCount;
private final ExecutorService taskStatusesCountExecutor;
private final LinkedHashMap<TaskStatus, AtomicLong> currentTaskStatusesCount;

public TaskUpdateManager(TaskService taskService,
IexecHubService iexecHubService,
Expand Down Expand Up @@ -97,23 +96,31 @@ public TaskUpdateManager(TaskService taskService,
"status", status.name()
).register(Metrics.globalRegistry);
}

this.taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
}

@PostConstruct
Future<Void> init() {
return taskStatusesCountExecutor.submit(
// The following could take a bit of time, depending on how many tasks are in DB.
// It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
// As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
// even though new deals are detected during the count.
() -> currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey()))),
final ExecutorService taskStatusesCountExecutor = Executors.newSingleThreadExecutor();
final Future<Void> future = taskStatusesCountExecutor.submit(
this::initializeCurrentTaskStatusesCount,
null // Trick to get a `Future<Void>` instead of a `Future<?>`
);
taskStatusesCountExecutor.shutdown();
return future;
}

/**
* The following could take a bit of time, depending on how many tasks are in DB.
* It is expected to take ~1.7s for 1,000,000 tasks and to be linear (so, ~17s for 10,000,000 tasks).
* As we use AtomicLongs, the final count should be accurate - no race conditions to expect,
* even though new deals are detected during the count.
*/
private void initializeCurrentTaskStatusesCount() {
currentTaskStatusesCount
.entrySet()
.parallelStream()
.forEach(entry -> entry.getValue().addAndGet(taskService.countByCurrentStatus(entry.getKey())));
publishTaskStatusesCountUpdate();
}

void updateTask(String chainTaskId) {
Expand Down Expand Up @@ -710,10 +717,28 @@ void toFailed(Task task, TaskStatus reason) {
void updateMetricsAfterStatusUpdate(TaskStatus previousStatus, TaskStatus newStatus) {
currentTaskStatusesCount.get(previousStatus).decrementAndGet();
currentTaskStatusesCount.get(newStatus).incrementAndGet();
publishTaskStatusesCountUpdate();
}

@EventListener(TaskCreatedEvent.class)
void onTaskCreatedEvent() {
currentTaskStatusesCount.get(RECEIVED).incrementAndGet();
publishTaskStatusesCountUpdate();
}

private void publishTaskStatusesCountUpdate() {
// Copying the map here ensures the original values can't be updated from outside this class.
// As this data should be read only, no need for any atomic class.
final LinkedHashMap<TaskStatus, Long> currentTaskStatusesCountToPublish = currentTaskStatusesCount
.entrySet()
.stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entrySet -> entrySet.getValue().get(),
(a, b) -> b,
LinkedHashMap::new
));
final TaskStatusesCountUpdatedEvent event = new TaskStatusesCountUpdatedEvent(currentTaskStatusesCountToPublish);
applicationEventPublisher.publishEvent(event);
}
}
62 changes: 47 additions & 15 deletions src/test/java/com/iexec/core/metric/MetricServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@
package com.iexec.core.metric;

import com.iexec.core.chain.DealWatcherService;
import com.iexec.core.task.TaskService;
import com.iexec.core.task.TaskStatus;
import com.iexec.core.task.event.TaskStatusesCountUpdatedEvent;
import com.iexec.core.worker.Worker;
import com.iexec.core.worker.WorkerService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.math.BigInteger;
import java.util.LinkedHashMap;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -38,8 +41,6 @@ class MetricServiceTests {
private DealWatcherService dealWatcherService;
@Mock
private WorkerService workerService;
@Mock
private TaskService taskService;

@InjectMocks
private MetricService metricService;
Expand All @@ -51,30 +52,61 @@ void init() {

@Test
void shouldGetPlatformMetrics() {
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = createExpectedCurrentTaskStatusesCount();

List<Worker> aliveWorkers = List.of(new Worker());
when(workerService.getAliveWorkers()).thenReturn(aliveWorkers);
when(workerService.getAliveTotalCpu()).thenReturn(1);
when(workerService.getAliveAvailableCpu()).thenReturn(1);
when(workerService.getAliveTotalGpu()).thenReturn(1);
when(workerService.getAliveAvailableGpu()).thenReturn(1);
when(taskService.getCompletedTasksCount())
.thenReturn(10L);
when(dealWatcherService.getDealEventsCount()).thenReturn(10L);
when(dealWatcherService.getDealsCount()).thenReturn(8L);
when(dealWatcherService.getReplayDealsCount()).thenReturn(2L);
when(dealWatcherService.getLatestBlockNumberWithDeal()).thenReturn(BigInteger.valueOf(255L));

PlatformMetric metric = metricService.getPlatformMetrics();
assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size());
assertThat(metric.getAliveTotalCpu()).isEqualTo(1);
assertThat(metric.getAliveAvailableCpu()).isEqualTo(1);
assertThat(metric.getAliveTotalGpu()).isEqualTo(1);
assertThat(metric.getAliveAvailableGpu()).isEqualTo(1);
assertThat(metric.getCompletedTasks()).isEqualTo(10L);
assertThat(metric.getDealEventsCount()).isEqualTo(10);
assertThat(metric.getDealsCount()).isEqualTo(8);
assertThat(metric.getReplayDealsCount()).isEqualTo(2);
assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255);
Assertions.assertAll(
() -> assertThat(metric.getAliveWorkers()).isEqualTo(aliveWorkers.size()),
() -> assertThat(metric.getAliveTotalCpu()).isEqualTo(1),
() -> assertThat(metric.getAliveAvailableCpu()).isEqualTo(1),
() -> assertThat(metric.getAliveTotalGpu()).isEqualTo(1),
() -> assertThat(metric.getAliveAvailableGpu()).isEqualTo(1),
() -> assertThat(metric.getCurrentTaskStatusesCount()).isEqualTo(expectedCurrentTaskStatusesCount),
() -> assertThat(metric.getDealEventsCount()).isEqualTo(10),
() -> assertThat(metric.getDealsCount()).isEqualTo(8),
() -> assertThat(metric.getReplayDealsCount()).isEqualTo(2),
() -> assertThat(metric.getLatestBlockNumberWithDeal()).isEqualTo(255)
);
}

private LinkedHashMap<TaskStatus, Long> createExpectedCurrentTaskStatusesCount() {
final LinkedHashMap<TaskStatus, Long> expectedCurrentTaskStatusesCount = new LinkedHashMap<>(TaskStatus.values().length);
expectedCurrentTaskStatusesCount.put(TaskStatus.RECEIVED, 1L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZING, 2L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZED, 3L);
expectedCurrentTaskStatusesCount.put(TaskStatus.INITIALIZE_FAILED, 4L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING, 5L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RUNNING_FAILED, 6L);
expectedCurrentTaskStatusesCount.put(TaskStatus.CONTRIBUTION_TIMEOUT, 7L);
expectedCurrentTaskStatusesCount.put(TaskStatus.CONSENSUS_REACHED, 8L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENING, 9L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPENED, 10L);
expectedCurrentTaskStatusesCount.put(TaskStatus.REOPEN_FAILED, 11L);
expectedCurrentTaskStatusesCount.put(TaskStatus.AT_LEAST_ONE_REVEALED, 12L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADING, 13L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOADED, 14L);
expectedCurrentTaskStatusesCount.put(TaskStatus.RESULT_UPLOAD_TIMEOUT, 15L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZING, 16L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZED, 17L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINALIZE_FAILED, 18L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FINAL_DEADLINE_REACHED, 19L);
expectedCurrentTaskStatusesCount.put(TaskStatus.COMPLETED, 20L);
expectedCurrentTaskStatusesCount.put(TaskStatus.FAILED, 21L);

metricService.onTaskStatusesCountUpdateEvent(new TaskStatusesCountUpdatedEvent(expectedCurrentTaskStatusesCount));

return expectedCurrentTaskStatusesCount;
}

}
Loading