From 87d5952444dbe3d5e04a01eb5cdd994d13c867f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Sun, 6 Apr 2025 10:54:31 +0200 Subject: [PATCH 1/2] Add IndexingPressureMonitor to monitor large indexing operations Relates ES-11063 --- .../elasticsearch/index/IndexingPressure.java | 18 +++++- .../index/IndexingPressureMonitor.java | 56 +++++++++++++++++++ .../elasticsearch/node/NodeConstruction.java | 6 +- .../node/PluginServiceInstances.java | 4 +- .../org/elasticsearch/plugins/Plugin.java | 6 ++ 5 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java index 99164fd4b740e..649842f1f39ac 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexingPressure.java +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressure.java @@ -19,11 +19,13 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.index.stats.IndexingPressureStats; +import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -public class IndexingPressure { +public class IndexingPressure implements IndexingPressureMonitor { public static final Setting MAX_INDEXING_BYTES = Setting.memorySizeSetting( "indexing_pressure.memory.limit", @@ -127,6 +129,8 @@ public class IndexingPressure { private final long replicaLimit; private final long operationLimit; + private final List listeners = new CopyOnWriteArrayList<>(); + public IndexingPressure(Settings settings) { this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes(); this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes(); @@ -335,12 +339,14 @@ void checkLargestPrimaryOperationIsWithinLimits( long largestOperationSizeInBytes, boolean allowsOperationsBeyondSizeLimit ) { + listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes)); if (largestOperationSizeInBytes > operationLimit) { this.largeOpsRejections.getAndIncrement(); this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes); if (allowsOperationsBeyondSizeLimit == false) { this.primaryRejections.getAndIncrement(); this.primaryDocumentRejections.addAndGet(operations); + listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes)); throw new EsRejectedExecutionException( "Request contains an operation of size [" + largestOperationSizeInBytes @@ -485,4 +491,14 @@ public IndexingPressureStats stats() { totalRejectedLargeOpsBytes.get() ); } + + @Override + public long getMaxAllowedOperationSizeInBytes() { + return operationLimit; + } + + @Override + public void addListener(IndexingPressureListener listener) { + listeners.add(listener); + } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java b/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java new file mode 100644 index 0000000000000..d6979d35a0247 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/IndexingPressureMonitor.java @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index; + +/** + * Monitors indexing pressure events within the system and tracks operation sizes. + * This interface provides mechanisms to check maximum allowed operation sizes + * and register listeners for indexing pressure events. + */ +public interface IndexingPressureMonitor { + /** + * Returns the maximum allowed size in bytes for any single indexing operation. + * Operations exceeding this limit may be rejected. + * + * @return the maximum allowed operation size in bytes + */ + long getMaxAllowedOperationSizeInBytes(); + + /** + * Registers a listener to be notified of indexing pressure events. + * The listener will receive callbacks when operations are tracked or rejected. + * + * @param listener the listener to register for indexing pressure events + */ + void addListener(IndexingPressureListener listener); + + /** + * Listener interface for receiving notifications about indexing pressure events. + * Implementations can respond to tracking of primary operations and rejections + * of large indexing operations. + */ + interface IndexingPressureListener { + /** + * Called when a primary indexing operation is tracked. + * The implementation should be really lightweight as this is called in a hot path. + * + * @param largestOperationSizeInBytes the size in bytes of the largest operation tracked + */ + void onPrimaryOperationTracked(long largestOperationSizeInBytes); + + /** + * Called when a large indexing operation is rejected due to exceeding size limits. + * The implementation should be really lightweight as this is called in a hot path. + * + * @param largestOperationSizeInBytes the size in bytes of the rejected operation + */ + void onLargeIndexingOperationRejection(long largestOperationSizeInBytes); + } +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 8947a4e3fe6f2..5818e6ef3ba84 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -930,6 +930,8 @@ public Map queryFields() { metadataCreateIndexService ); + final IndexingPressure indexingLimits = new IndexingPressure(settings); + PluginServiceInstances pluginServices = new PluginServiceInstances( client, clusterService, @@ -952,7 +954,8 @@ public Map queryFields() { documentParsingProvider, taskManager, projectResolver, - slowLogFieldProvider + slowLogFieldProvider, + indexingLimits ); Collection pluginComponents = pluginsService.flatMap(plugin -> { @@ -985,7 +988,6 @@ public Map queryFields() { .map(TerminationHandlerProvider::handler); terminationHandler = getSinglePlugin(terminationHandlers, TerminationHandler.class).orElse(null); - final IndexingPressure indexingLimits = new IndexingPressure(settings); final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(client, indexingLimits); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); diff --git a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java index aeb7646524582..e1d4c81c4d087 100644 --- a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java +++ b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java @@ -20,6 +20,7 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.features.FeatureService; +import org.elasticsearch.index.IndexingPressureMonitor; import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.SystemIndices; @@ -55,5 +56,6 @@ public record PluginServiceInstances( DocumentParsingProvider documentParsingProvider, TaskManager taskManager, ProjectResolver projectResolver, - SlowLogFieldProvider slowLogFieldProvider + SlowLogFieldProvider slowLogFieldProvider, + IndexingPressureMonitor indexingPressureMonitor ) implements Plugin.PluginServices {} diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 4a3990e3fd3c3..a931f686b7279 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -29,6 +29,7 @@ import org.elasticsearch.features.FeatureService; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettingProvider; +import org.elasticsearch.index.IndexingPressureMonitor; import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.SystemIndices; @@ -186,6 +187,11 @@ public interface PluginServices { * Provider for additional SlowLog fields */ SlowLogFieldProvider slowLogFieldProvider(); + + /** + * Monitors indexing pressure events within the system and tracks operation sizes. + */ + IndexingPressureMonitor indexingPressureMonitor(); } /** From ed44a544794b21322e1181981804771e5b5571d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Sun, 6 Apr 2025 11:01:47 +0200 Subject: [PATCH 2/2] Update docs/changelog/126372.yaml --- docs/changelog/126372.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/126372.yaml diff --git a/docs/changelog/126372.yaml b/docs/changelog/126372.yaml new file mode 100644 index 0000000000000..75345296d8392 --- /dev/null +++ b/docs/changelog/126372.yaml @@ -0,0 +1,5 @@ +pr: 126372 +summary: Add `IndexingPressureMonitor` to monitor large indexing operations +area: CRUD +type: enhancement +issues: []