Skip to content

ES-10063 Add multi-project support for more stats APIs #127650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5d90b35
Add multi-project support for more stats APIs
PeteGillinElastic May 2, 2025
2ceb353
Update test/external-modules/multi-project/build.gradle
PeteGillinElastic May 6, 2025
7cc0618
Respond to review comments
PeteGillinElastic May 6, 2025
e7ba94e
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 6, 2025
793c415
fix merge weirdness
PeteGillinElastic May 6, 2025
46d4aa4
[CI] Auto commit changes from spotless
elasticsearchmachine May 6, 2025
73143f4
Fix test compilation following upstream change to base class
PeteGillinElastic May 6, 2025
39eced1
Update x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/…
PeteGillinElastic May 7, 2025
635fd65
Make projects-by-index map nullable and omit in single-project; alway…
PeteGillinElastic May 7, 2025
600bb6c
Add a TODO
PeteGillinElastic May 7, 2025
1df27b1
update IT to reflect changed behaviour
PeteGillinElastic May 7, 2025
084d605
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 7, 2025
6e1f8a7
Switch to using XContent.Params to indicate whether it is multi-proje…
PeteGillinElastic May 12, 2025
ba44688
Refactor NodesStatsMultiProjectIT to common up repeated assertions
PeteGillinElastic May 12, 2025
23aa344
Defer use of ProjectIdResolver in REST handlers to keep tests happy
PeteGillinElastic May 12, 2025
8ef9be4
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 12, 2025
d9a9a7f
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 15, 2025
7f60ef3
Include index UUID in "unknown project" case
PeteGillinElastic May 15, 2025
3bfafbf
Make the index-to-project map empty rather than null in the BWC deser…
PeteGillinElastic May 15, 2025
1810bc4
remove a TODO that is done, and add a comment
PeteGillinElastic May 15, 2025
c29f27d
fix typo
PeteGillinElastic May 15, 2025
1fcad79
Get REST YAML tests working with project ID prefix TODO finish this
PeteGillinElastic May 19, 2025
466b2a0
As a drive-by, fix and un-suppress one of the health REST tests
PeteGillinElastic May 19, 2025
ad08d50
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 19, 2025
e0fab44
[CI] Auto commit changes from spotless
elasticsearchmachine May 19, 2025
43cde05
TODO ugh
PeteGillinElastic May 19, 2025
7ad66ab
Experiment with different stashing behaviour
PeteGillinElastic May 19, 2025
7d16998
[CI] Auto commit changes from spotless
elasticsearchmachine May 19, 2025
9d09783
Try a more sensible stash behaviour for assertions
PeteGillinElastic May 19, 2025
8770871
clarify comment
PeteGillinElastic May 19, 2025
3b3b02c
Make checkstyle happy
PeteGillinElastic May 19, 2025
dab57e6
Make the way `Assertion` works more consistent, and simplify implemen…
PeteGillinElastic May 20, 2025
918c1f3
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 20, 2025
a95a3d2
[CI] Auto commit changes from spotless
elasticsearchmachine May 20, 2025
d353627
In RestNodesStatsAction, make the XContent params to channel.request(…
PeteGillinElastic May 21, 2025
5c50366
Merge remote-tracking branch 'upstream/main' into ES-10063-mp-stats
PeteGillinElastic May 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Requests;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
Expand Down Expand Up @@ -112,7 +113,12 @@ public void testFailureInConditionalProcessor() {
NodesStatsResponse r = clusterAdmin().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get();
int nodeCount = r.getNodes().size();
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().processorStats().get(pipelineId);
List<IngestStats.ProcessorStat> stats = r.getNodes()
.get(k)
.getIngestStats()
.processorStats()
.get(ProjectId.DEFAULT)
.get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.stats().ingestCurrent(), greaterThanOrEqualTo(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.MockScriptEngine;
Expand Down Expand Up @@ -109,7 +110,10 @@ public void testIngestStatsNamesAndTypes() throws IOException {
assertThat(pipelineStat.pipelineId(), equalTo("pipeline1"));
assertThat(pipelineStat.stats().ingestCount(), equalTo(1L));

List<IngestStats.ProcessorStat> processorStats = stats.getIngestStats().processorStats().get("pipeline1");
List<IngestStats.ProcessorStat> processorStats = stats.getIngestStats()
.processorStats()
.get(ProjectId.DEFAULT)
.get("pipeline1");
assertThat(processorStats.size(), equalTo(4));

IngestStats.ProcessorStat setA = processorStats.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INTRODUCE_FAILURES_DEFAULT_RETENTION = def(9_071_0_00);
public static final TransportVersion FILE_SETTINGS_HEALTH_INFO = def(9_072_0_00);
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_074_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.NetworkModule;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.index.stats.IndexingPressureStats;
import org.elasticsearch.ingest.IngestStats.ProcessorStat;
import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.os.OsInfo;
Expand Down Expand Up @@ -709,37 +711,38 @@ static class IngestStats implements ToXContentFragment {
final SortedMap<String, long[]> stats;

IngestStats(final List<NodeStats> nodeStats) {
Set<String> pipelineIds = new HashSet<>();
Map<ProjectId, Set<String>> pipelineIdsByProject = new HashMap<>();
SortedMap<String, long[]> stats = new TreeMap<>();
for (NodeStats nodeStat : nodeStats) {
if (nodeStat.getIngestStats() != null) {
for (Map.Entry<String, List<org.elasticsearch.ingest.IngestStats.ProcessorStat>> processorStats : nodeStat
.getIngestStats()
.processorStats()
.entrySet()) {
pipelineIds.add(processorStats.getKey());
for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.type(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats();
if (v == null) {
return new long[] {
nodeIngestStats.ingestCount(),
nodeIngestStats.ingestFailedCount(),
nodeIngestStats.ingestCurrent(),
nodeIngestStats.ingestTimeInMillis() };
} else {
v[0] += nodeIngestStats.ingestCount();
v[1] += nodeIngestStats.ingestFailedCount();
v[2] += nodeIngestStats.ingestCurrent();
v[3] += nodeIngestStats.ingestTimeInMillis();
return v;
}
});
Map<ProjectId, Map<String, List<ProcessorStat>>> nodeProcessorStats = nodeStat.getIngestStats().processorStats();
for (Map.Entry<ProjectId, Map<String, List<ProcessorStat>>> processorStatsForProject : nodeProcessorStats.entrySet()) {
ProjectId projectId = processorStatsForProject.getKey();
for (Map.Entry<String, List<ProcessorStat>> processorStats : processorStatsForProject.getValue().entrySet()) {
pipelineIdsByProject.computeIfAbsent(projectId, k -> new HashSet<>()).add(processorStats.getKey());
for (ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.type(), (k, v) -> {
org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats();
if (v == null) {
return new long[] {
nodeIngestStats.ingestCount(),
nodeIngestStats.ingestFailedCount(),
nodeIngestStats.ingestCurrent(),
nodeIngestStats.ingestTimeInMillis() };
} else {
v[0] += nodeIngestStats.ingestCount();
v[1] += nodeIngestStats.ingestFailedCount();
v[2] += nodeIngestStats.ingestCurrent();
v[3] += nodeIngestStats.ingestTimeInMillis();
return v;
}
});
}
}
}
}
}
this.pipelineCount = pipelineIds.size();
this.pipelineCount = pipelineIdsByProject.values().stream().mapToInt(Set::size).sum();
this.stats = Collections.unmodifiableSortedMap(stats);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -482,7 +483,13 @@ public NodeIndicesStats stats(CommonStatsFlags flags, boolean includeShardsStats
}
}

return new NodeIndicesStats(commonStats, statsByIndex(this, flags), statsByShard(this, flags), includeShardsStats);
return new NodeIndicesStats(
commonStats,
statsByIndex(this, flags),
statsByShard(this, flags),
projectsByIndex(),
includeShardsStats
);
}

static Map<Index, CommonStats> statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) {
Expand Down Expand Up @@ -564,6 +571,15 @@ IndexShardStats indexShardStats(final IndicesService indicesService, final Index
);
}

private Map<Index, ProjectId> projectsByIndex() {
Map<Index, ProjectId> map = new HashMap<>();
for (IndexService indexShards : indices.values()) {
Index index = indexShards.index();
clusterService.state().metadata().lookupProject(index).ifPresent(project -> map.put(index, project.id()));
}
return map;
}

/**
* Checks if changes (adding / removing) indices, shards and so on are allowed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -55,6 +57,8 @@
import java.util.Map;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

/**
* Global information on indices stats running on a specific node.
*/
Expand All @@ -66,6 +70,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent {
private final CommonStats stats;
private final Map<Index, List<IndexShardStats>> statsByShard;
private final Map<Index, CommonStats> statsByIndex;
private final Map<Index, ProjectId> projectsByIndex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame we have to pass in this map ourselves (and (de)serialize it), as it's pretty much just the Metadata.ProjectLookup... It looks like we only need it for the XContent serialization, but I can't think of a way to pass the ProjectLookup to that serialization method. I do think it's worth thinking about that some more (and maybe asking others for suggestions). This is going to result in a lot more serialization over the wire.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a lot more. It's adding one ProjectId where there's already a CommonStats and a List<IndexShardStats>. But I agree it's unfortunate to be sharing copies of this around everywhere. I'll start a thread on the channel, as you suggest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, the serialization works a bit different in this class, it doesn't just serialize a full map. Hm ok that makes it a lot less worse, indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I just realized I'm mixing things up. I was thinking of the serialization of IngestStats. In NodeIndicesStats, this map will result in quite a bit more serialization: a whole map where we serialize every index (not sure if it's every index on the node or really all indices, but still) and every project ID.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, it's every index on the node. I still think that ProjectId is small compared with other stuff we're already storing for each index on the node...


public NodeIndicesStats(StreamInput in) throws IOException {
stats = new CommonStats(in);
Expand All @@ -87,20 +92,31 @@ public NodeIndicesStats(StreamInput in) throws IOException {
} else {
statsByIndex = new HashMap<>();
}

if (in.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) {
projectsByIndex = in.readMap(Index::new, ProjectId::readFrom);
} else {
projectsByIndex = Map.of();
}
}

/**
* Constructs an instance. The {@code projectsByIndex} map will be stored, and the project IDs will be prepended to the index names when
* converting this instance to XContent (except when it is the default project).
*/
public NodeIndicesStats(
CommonStats oldStats,
Map<Index, CommonStats> statsByIndex,
Map<Index, List<IndexShardStats>> statsByShard,
Map<Index, ProjectId> projectsByIndex,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I like this approach of passing in the index to project map

boolean includeShardsStats
) {
if (includeShardsStats) {
this.statsByShard = Objects.requireNonNull(statsByShard);
this.statsByShard = requireNonNull(statsByShard);
} else {
this.statsByShard = EMPTY_STATS_BY_SHARD;
}
this.statsByIndex = Objects.requireNonNull(statsByIndex);
this.statsByIndex = requireNonNull(statsByIndex);

// make a total common stats from old ones and current ones
this.stats = oldStats;
Expand All @@ -114,6 +130,7 @@ public NodeIndicesStats(
for (CommonStats indexStats : statsByIndex.values()) {
stats.add(indexStats);
}
this.projectsByIndex = requireNonNull(projectsByIndex);
}

@Nullable
Expand Down Expand Up @@ -228,19 +245,25 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(VERSION_SUPPORTING_STATS_BY_INDEX)) {
out.writeMap(statsByIndex);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) {
out.writeMap(projectsByIndex);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeIndicesStats that = (NodeIndicesStats) o;
return stats.equals(that.stats) && statsByShard.equals(that.statsByShard) && statsByIndex.equals(that.statsByIndex);
return stats.equals(that.stats)
&& statsByShard.equals(that.statsByShard)
&& statsByIndex.equals(that.statsByIndex)
&& projectsByIndex.equals(that.projectsByIndex);
}

@Override
public int hashCode() {
return Objects.hash(stats, statsByShard, statsByIndex);
return Objects.hash(stats, statsByShard, statsByIndex, projectsByIndex);
}

@Override
Expand All @@ -260,7 +283,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
case INDICES -> ChunkedToXContentHelper.object(
Fields.INDICES,
Iterators.map(createCommonStatsByIndex().entrySet().iterator(), entry -> (builder, params) -> {
builder.startObject(entry.getKey().getName());
builder.startObject(xContentKey(entry.getKey()));
entry.getValue().toXContent(builder, outerParams);
return builder.endObject();
})
Expand All @@ -271,7 +294,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
Iterators.flatMap(
statsByShard.entrySet().iterator(),
entry -> ChunkedToXContentHelper.array(
entry.getKey().getName(),
xContentKey(entry.getKey()),
Iterators.flatMap(
entry.getValue().iterator(),
indexShardStats -> Iterators.concat(
Expand All @@ -291,6 +314,23 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerP
);
}

private String xContentKey(Index index) {
if (projectsByIndex.isEmpty()) { // mapping is not available if this instance came from an older node
return index.getName();
}
ProjectId projectId = projectsByIndex.get(index);
if (projectId == null) {
// This can happen if the stats were captured after the IndexService was created but before the state was updated.
// It can also happen if this instance was constructed on a node older than VERSION_SUPPORTING_STATS_BY_INDEX.
// The best we can do is handle it gracefully.
return "<unknown>/" + index.getName();
} else if (projectId.equals(Metadata.DEFAULT_PROJECT_ID)) {
return index.getName();
} else {
return projectId + "/" + index.getName();
}
}

private Map<Index, CommonStats> createCommonStatsByIndex() {
Map<Index, CommonStats> statsMap = new HashMap<>();

Expand Down
28 changes: 13 additions & 15 deletions server/src/main/java/org/elasticsearch/ingest/IngestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.ProjectId;
Expand All @@ -60,7 +59,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -1244,23 +1242,23 @@ private static void executePipeline(
});
}

// Don't use default project id
@FixForMultiProject
public IngestStats stats() {
IngestStats.Builder statsBuilder = new IngestStats.Builder();
statsBuilder.addTotalMetrics(totalMetrics);
pipelines.getOrDefault(Metadata.DEFAULT_PROJECT_ID, ImmutableOpenMap.of()).forEach((id, holder) -> {
Pipeline pipeline = holder.pipeline;
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
collectProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric);
for (ProjectId projectId : pipelines.keySet()) {
pipelines.getOrDefault(projectId, ImmutableOpenMap.of()).forEach((id, holder) -> {
Pipeline pipeline = holder.pipeline;
CompoundProcessor rootProcessor = pipeline.getCompoundProcessor();
statsBuilder.addPipelineMetrics(projectId, id, pipeline.getMetrics());
List<Tuple<Processor, IngestMetric>> processorMetrics = new ArrayList<>();
collectProcessorMetrics(rootProcessor, processorMetrics);
processorMetrics.forEach(t -> {
Processor processor = t.v1();
IngestMetric processorMetric = t.v2();
statsBuilder.addProcessorMetrics(projectId, id, getProcessorName(processor), processor.getType(), processorMetric);
});
});
});
}
return statsBuilder.build();
}

Expand Down
Loading