Skip to content

Add IndexingPressureMonitor to monitor large indexing operations #126372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 15, 2025
5 changes: 5 additions & 0 deletions docs/changelog/126372.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126372
summary: Add `IndexingPressureMonitor` to monitor large indexing operations
area: CRUD
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.stats.IndexingPressureStats;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class IndexingPressure {
public class IndexingPressure implements IndexingPressureMonitor {

public static final Setting<ByteSizeValue> MAX_INDEXING_BYTES = Setting.memorySizeSetting(
"indexing_pressure.memory.limit",
Expand Down Expand Up @@ -127,6 +129,8 @@ public class IndexingPressure {
private final long replicaLimit;
private final long operationLimit;

private final List<IndexingPressureListener> listeners = new CopyOnWriteArrayList<>();

public IndexingPressure(Settings settings) {
this.lowWatermark = SPLIT_BULK_LOW_WATERMARK.get(settings).getBytes();
this.lowWatermarkSize = SPLIT_BULK_LOW_WATERMARK_SIZE.get(settings).getBytes();
Expand Down Expand Up @@ -339,12 +343,14 @@ void checkLargestPrimaryOperationIsWithinLimits(
long largestOperationSizeInBytes,
boolean allowsOperationsBeyondSizeLimit
) {
listeners.forEach(l -> l.onPrimaryOperationTracked(largestOperationSizeInBytes));
if (largestOperationSizeInBytes > operationLimit) {
this.largeOpsRejections.getAndIncrement();
this.totalRejectedLargeOpsBytes.addAndGet(largestOperationSizeInBytes);
if (allowsOperationsBeyondSizeLimit == false) {
this.primaryRejections.getAndIncrement();
this.primaryDocumentRejections.addAndGet(operations);
listeners.forEach(l -> l.onLargeIndexingOperationRejection(largestOperationSizeInBytes));
throw new EsRejectedExecutionException(
"Request contains an operation of size ["
+ largestOperationSizeInBytes
Expand Down Expand Up @@ -489,4 +495,14 @@ public IndexingPressureStats stats() {
totalRejectedLargeOpsBytes.get()
);
}

@Override
public long getMaxAllowedOperationSizeInBytes() {
return operationLimit;
}

@Override
public void addListener(IndexingPressureListener listener) {
listeners.add(listener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index;

/**
* Monitors indexing pressure events within the system and tracks operation sizes.
* This interface provides mechanisms to check maximum allowed operation sizes
* and register listeners for indexing pressure events.
*/
public interface IndexingPressureMonitor {
/**
* Returns the maximum allowed size in bytes for any single indexing operation.
* Operations exceeding this limit may be rejected.
*
* @return the maximum allowed operation size in bytes
*/
long getMaxAllowedOperationSizeInBytes();

/**
* Registers a listener to be notified of indexing pressure events.
* The listener will receive callbacks when operations are tracked or rejected.
*
* @param listener the listener to register for indexing pressure events
*/
void addListener(IndexingPressureListener listener);

/**
* Listener interface for receiving notifications about indexing pressure events.
* Implementations can respond to tracking of primary operations and rejections
* of large indexing operations.
*/
interface IndexingPressureListener {
/**
* Called when a primary indexing operation is tracked.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the largest operation tracked
*/
void onPrimaryOperationTracked(long largestOperationSizeInBytes);

/**
* Called when a large indexing operation is rejected due to exceeding size limits.
* The implementation should be really lightweight as this is called in a hot path.
*
* @param largestOperationSizeInBytes the size in bytes of the rejected operation
*/
void onLargeIndexingOperationRejection(long largestOperationSizeInBytes);
}
}
Loading