Skip to content

ES-10063 Add multi-project support for more stats APIs #127650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -112,7 +113,12 @@ public void testFailureInConditionalProcessor() {
NodesStatsResponse r = clusterAdmin().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
int nodeCount = r.getNodes().size();
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().processorStats().get(pipelineId);
List<IngestStats.ProcessorStat> stats = r.getNodes()
.get(k)
.getIngestStats()
.processorStats()
.get(ProjectId.DEFAULT)
.get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.stats().ingestCurrent(), greaterThanOrEqualTo(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptEngine;
Expand Down Expand Up @@ -109,7 +110,10 @@ public void testIngestStatsNamesAndTypes() throws IOException {
assertThat(pipelineStat.pipelineId(), equalTo("pipeline1"));
assertThat(pipelineStat.stats().ingestCount(), equalTo(1L));

List<IngestStats.ProcessorStat> processorStats = stats.getIngestStats().processorStats().get("pipeline1");
List<IngestStats.ProcessorStat> processorStats = stats.getIngestStats()
.processorStats()
.get(ProjectId.DEFAULT)
.get("pipeline1");
assertThat(processorStats.size(), equalTo(4));

IngestStats.ProcessorStat setA = processorStats.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ static TransportVersion def(int id) {
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_00_0);
public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_075_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.index.stats.IndexingPressureStats;
import org.elasticsearch.ingest.IngestStats.ProcessorStat;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
Expand Down Expand Up @@ -709,37 +711,38 @@ static class IngestStats implements ToXContentFragment {
final SortedMap<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
Map<ProjectId, Set<String>> pipelineIdsByProject = new HashMap<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String, List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat
.getIngestStats()
.processorStats()
.entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.type(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats();
if (v == null) {
return new long[] {
nodeIngestStats.ingestCount(),
nodeIngestStats.ingestFailedCount(),
nodeIngestStats.ingestCurrent(),
nodeIngestStats.ingestTimeInMillis() };
} else {
v[0] += nodeIngestStats.ingestCount();
v[1] += nodeIngestStats.ingestFailedCount();
v[2] += nodeIngestStats.ingestCurrent();
v[3] += nodeIngestStats.ingestTimeInMillis();
return v;
}
});
Map<ProjectId, Map<String, List<ProcessorStat>>> nodeProcessorStats = nodeStat.getIngestStats().processorStats();
for (Map.Entry<ProjectId, Map<String, List<ProcessorStat>>> processorStatsForProject : nodeProcessorStats.entrySet()) {
ProjectId projectId = processorStatsForProject.getKey();
for (Map.Entry<String, List<ProcessorStat>> processorStats : processorStatsForProject.getValue().entrySet()) {
pipelineIdsByProject.computeIfAbsent(projectId, k -> new HashSet<>()).add(processorStats.getKey());
for (ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.type(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats();
if (v == null) {
return new long[] {
nodeIngestStats.ingestCount(),
nodeIngestStats.ingestFailedCount(),
nodeIngestStats.ingestCurrent(),
nodeIngestStats.ingestTimeInMillis() };
} else {
v[0] += nodeIngestStats.ingestCount();
v[1] += nodeIngestStats.ingestFailedCount();
v[2] += nodeIngestStats.ingestCurrent();
v[3] += nodeIngestStats.ingestTimeInMillis();
return v;
}
});
}
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.pipelineCount = pipelineIdsByProject.values().stream().mapToInt(Set::size).sum();
this.stats = Collections.unmodifiableSortedMap(stats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -482,7 +483,13 @@ public NodeIndicesStats stats(CommonStatsFlags flags, boolean includeShardsStats
}
}

return new NodeIndicesStats(commonStats, statsByIndex(this, flags), statsByShard(this, flags), includeShardsStats);
return new NodeIndicesStats(
commonStats,
statsByIndex(this, flags),
statsByShard(this, flags),
projectResolver.supportsMultipleProjects() ? projectsByIndex() : null,
includeShardsStats
);
}

static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down Expand Up @@ -564,6 +571,15 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
);
}

private Map<Index, ProjectId> projectsByIndex() {
Map<Index, ProjectId> map = new HashMap<>(indices.size());
for (IndexService indexShards : indices.values()) {
Index index = indexShards.index();
clusterService.state().metadata().lookupProject(index).ifPresent(project -> map.put(index, project.id()));
}
return map;
}

/**
* Checks if changes (adding / removing) indices, shards and so on are allowed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -55,6 +56,8 @@
import java.util.Map;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

/**
* Global information on indices stats running on a specific node.
*/
Expand All @@ -66,6 +69,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent {
private final CommonStats stats;
private final Map<Index, List<IndexShardStats>> statsByShard;
private final Map<Index, CommonStats> statsByIndex;
private final @Nullable Map<Index, ProjectId> projectsByIndex;

public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
Expand All @@ -87,20 +91,33 @@ public NodeIndicesStats(StreamInput in) throws IOException {
} else {
statsByIndex = new HashMap<>();
}

if (in.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) {
boolean hasProjectsByIndex = in.readBoolean();
projectsByIndex = hasProjectsByIndex ? in.readMap(Index::new, ProjectId::readFrom) : null;
} else {
projectsByIndex = null;
}
}

/**
* Constructs an instance. If the {@code projectsByIndex} argument is non-null, the project-to-index map will be stored, and the
* project IDs will be prepended to the index names when converting this instance to XContent. This is appropriate for multi-project
* clusters. If the argument is null, no project IDs will be prepended. This is appropriate for single-project clusters.
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, CommonStats> statsByIndex,
Map<Index, List<IndexShardStats>> statsByShard,
@Nullable Map<Index, ProjectId> projectsByIndex,
boolean includeShardsStats
) {
if (includeShardsStats) {
this.statsByShard = Objects.requireNonNull(statsByShard);
this.statsByShard = requireNonNull(statsByShard);
} else {
this.statsByShard = EMPTY_STATS_BY_SHARD;
}
this.statsByIndex = Objects.requireNonNull(statsByIndex);
this.statsByIndex = requireNonNull(statsByIndex);

// make a total common stats from old ones and current ones
this.stats = oldStats;
Expand All @@ -114,6 +131,7 @@ public NodeIndicesStats(
for (CommonStats indexStats : statsByIndex.values()) {
stats.add(indexStats);
}
this.projectsByIndex = projectsByIndex;
}

@Nullable
Expand Down Expand Up @@ -228,19 +246,28 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(VERSION_SUPPORTING_STATS_BY_INDEX)) {
out.writeMap(statsByIndex);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) {
out.writeBoolean(projectsByIndex != null);
if (projectsByIndex != null) {
out.writeMap(projectsByIndex);
}
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeIndicesStats that = (NodeIndicesStats) o;
return stats.equals(that.stats) && statsByShard.equals(that.statsByShard) && statsByIndex.equals(that.statsByIndex);
return stats.equals(that.stats)
&& statsByShard.equals(that.statsByShard)
&& statsByIndex.equals(that.statsByIndex)
&& Objects.equals(projectsByIndex, that.projectsByIndex);
}

@Override
public int hashCode() {
return Objects.hash(stats, statsByShard, statsByIndex);
return Objects.hash(stats, statsByShard, statsByIndex, projectsByIndex);
}

@Override
Expand All @@ -260,7 +287,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
case INDICES -> ChunkedToXContentHelper.object(
Fields.INDICES,
Iterators.map(createCommonStatsByIndex().entrySet().iterator(), entry -> (builder, params) -> {
builder.startObject(entry.getKey().getName());
builder.startObject(xContentKey(entry.getKey()));
entry.getValue().toXContent(builder, outerParams);
return builder.endObject();
})
Expand All @@ -271,7 +298,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
Iterators.flatMap(
statsByShard.entrySet().iterator(),
entry -> ChunkedToXContentHelper.array(
entry.getKey().getName(),
xContentKey(entry.getKey()),
Iterators.flatMap(
entry.getValue().iterator(),
indexShardStats -> Iterators.concat(
Expand All @@ -291,6 +318,20 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
);
}

private String xContentKey(Index index) {
if (projectsByIndex == null) {
return index.getName();
}
ProjectId projectId = projectsByIndex.get(index);
if (projectId == null) {
// This can happen if the stats were captured after the IndexService was created but before the state was updated.
// The best we can do is handle it gracefully.
return "<unknown>/" + index.getName();
} else {
return projectId + "/" + index.getName();
}
}

private Map<Index, CommonStats> createCommonStatsByIndex() {
Map<Index, CommonStats> statsMap = new HashMap<>();

Expand Down
28 changes: 13 additions & 15 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -60,7 +59,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -1244,23 +1242,23 @@ private static void executePipeline(
});
}

// Don't use default project id
@FixForMultiProject
public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.getOrDefault(Metadata.DEFAULT_PROJECT_ID, ImmutableOpenMap.of()).forEach((id, holder) -> {
Pipeline pipeline = holder.pipeline;
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
collectProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
for (ProjectId projectId : pipelines.keySet()) {
pipelines.getOrDefault(projectId, ImmutableOpenMap.of()).forEach((id, holder) -> {
Pipeline pipeline = holder.pipeline;
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(projectId, id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
collectProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(projectId, id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
});
}
return statsBuilder.build();
}

Expand Down
Loading