Skip to content

Semantic Text Chunking Indexing Pressure #125517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 54 commits into from
Apr 14, 2025
Merged
Changes from 1 commit
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
bc62301
Added circuit breaker
Mikep86 Mar 19, 2025
0a58daa
Pass circuit breaker to action filter
Mikep86 Mar 20, 2025
a77516f
Estimate memory usage before performing inference
Mikep86 Mar 20, 2025
fee2c90
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 20, 2025
83395f4
Reset circuit breaker on completion of request handling
Mikep86 Mar 20, 2025
f84ff66
Calculate actual memory usage
Mikep86 Mar 20, 2025
8698ecd
Spotless
Mikep86 Mar 20, 2025
c7b1af1
Added TODOs
Mikep86 Mar 20, 2025
6458bb3
Added more comments
Mikep86 Mar 20, 2025
6b7db55
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 21, 2025
f4a4689
Track memory usage of requests that don't perform inference
Mikep86 Mar 21, 2025
c74ca3c
Fix test failures
Mikep86 Mar 21, 2025
f5e8a94
Add circuit breaker unit test
Mikep86 Mar 21, 2025
1d5e5bd
Circuit breaker test development
Mikep86 Mar 21, 2025
d93050f
Fix memory usage tracking in estimateMemoryUsage
Mikep86 Mar 24, 2025
2480955
Make circuit breaker limit setting dynamically updatable
Mikep86 Mar 24, 2025
5d76384
Updated estimateMemoryUsage to throw InferenceException
Mikep86 Mar 24, 2025
4bcff47
Updated InferenceException to retain the original message when it is …
Mikep86 Mar 24, 2025
080ae60
Added circuit breaker trips on estimated inference bytes unit test
Mikep86 Mar 24, 2025
30e0a08
[CI] Auto commit changes from spotless
elasticsearchmachine Mar 24, 2025
8939687
Increment byte counters after updating breaker
Mikep86 Mar 28, 2025
6071cb0
Check that circuit breaker usage is 0
Mikep86 Mar 28, 2025
0507d94
Add indexing pressure to plugin services
Mikep86 Mar 28, 2025
a643877
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Mar 28, 2025
a6200d5
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 1, 2025
c211ff6
Pass indexing pressure to action filter
Mikep86 Apr 1, 2025
a106f87
Pass coordinating object to AsyncBulkShardInferenceAction
Mikep86 Apr 1, 2025
4a2976c
Use coordinating indexing pressure in ShardBulkInferenceActionFilter
Mikep86 Apr 1, 2025
56a5f97
Update circuit breaker test
Mikep86 Apr 1, 2025
111bb1f
Update circuit breaker trips on estimated inference bytes test
Mikep86 Apr 1, 2025
02d70a0
Remove inference bytes circuit breaker
Mikep86 Apr 1, 2025
bf9d118
Adjust coordinating indexing pressure lifetime
Mikep86 Apr 2, 2025
456fc59
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 2, 2025
fee592b
Account for indexing pressure from source in batches
Mikep86 Apr 3, 2025
437ca6b
Account for indexing pressure from empty chunk inference updates
Mikep86 Apr 4, 2025
96f4037
Add indexing pressure from source modifications
Mikep86 Apr 4, 2025
fcf7387
Fix testIndexingPressure
Mikep86 Apr 4, 2025
e5f64ff
Fix testIndexingPressureTripsOnEstimatedInferenceBytes
Mikep86 Apr 4, 2025
b183b20
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 4, 2025
3349168
Cleanup
Mikep86 Apr 4, 2025
8a28093
Fix compilation errors
Mikep86 Apr 4, 2025
bf420c8
Added unit test
Mikep86 Apr 4, 2025
2e479a9
Revert changes to InferenceException
Mikep86 Apr 4, 2025
2330e3e
Resolve TODO
Mikep86 Apr 4, 2025
0bb32cc
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 4, 2025
0065a87
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 7, 2025
0ff48cf
Resolve TODOs
Mikep86 Apr 8, 2025
ecb8e02
Pass indexing pressure in constructor
Mikep86 Apr 8, 2025
6a3a4fd
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 8, 2025
f4aef73
Added partial failure test
Mikep86 Apr 8, 2025
118c27f
Merge branch 'main' into semantic-text_oom-circuit-breaker
elasticmachine Apr 8, 2025
8cc4402
Update docs/changelog/125517.yaml
Mikep86 Apr 11, 2025
0a138f2
Fix changelog
Mikep86 Apr 11, 2025
e8742d2
Merge branch 'main' into semantic-text_oom-circuit-breaker
Mikep86 Apr 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Circuit breaker test development
  • Loading branch information
Mikep86 committed Mar 21, 2025
commit 1d5e5bd2193c9bb184891b204cd33d9b0574f76d
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.XPackField;
Expand All @@ -75,13 +76,15 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.elasticsearch.xpack.inference.action.filter.ShardBulkInferenceActionFilter.INDICES_INFERENCE_BATCH_SIZE;
import static org.elasticsearch.xpack.inference.action.filter.ShardBulkInferenceActionFilter.getIndexRequestOrNull;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextField.getChunksFieldName;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextField.getOriginalTextFieldName;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldMapperTests.addSemanticTextInferenceResults;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomChunkedInferenceEmbedding;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticText;
import static org.elasticsearch.xpack.inference.mapper.SemanticTextFieldTests.randomSemanticTextInput;
Expand Down Expand Up @@ -491,6 +494,7 @@ public void testCircuitBreaker() throws Exception {
final int denseModelEmbeddingBytes = denseModel.getServiceSettings()
.elementType()
.getNumBytes(denseModel.getServiceSettings().dimensions());
final Function<XContentBuilder, Long> bytesUsed = b -> BytesReference.bytes(b).ramBytesUsed();
final ShardBulkInferenceActionFilter filter = createFilter(
threadPool,
Map.of(sparseModel.getInferenceEntityId(), sparseModel, denseModel.getInferenceEntityId(), denseModel),
Expand All @@ -501,27 +505,59 @@ public void testCircuitBreaker() throws Exception {

XContentBuilder doc0Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "a test value");
XContentBuilder doc1Source = IndexRequest.getXContentBuilder(XContentType.JSON, "dense_field", "another test value");
XContentBuilder doc2Source = IndexRequest.getXContentBuilder(
XContentType.JSON,
"sparse_field",
"a test value",
"dense_field",
"another test value"
);
XContentBuilder doc3Source = IndexRequest.getXContentBuilder(
XContentType.JSON,
"dense_field",
List.of("value one", " ", "value two")
);
XContentBuilder doc4Source = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", " ");
XContentBuilder doc5Source = XContentFactory.contentBuilder(XContentType.JSON);
{
doc5Source.startObject();
if (useLegacyFormat == false) {
doc5Source.field("sparse_field", "a test value");
}
addSemanticTextInferenceResults(
useLegacyFormat,
doc5Source,
List.of(randomSemanticText(useLegacyFormat, "sparse_field", sparseModel, List.of("a test value"), XContentType.JSON))
);
doc5Source.endObject();
}
XContentBuilder doc0UpdateSource = IndexRequest.getXContentBuilder(XContentType.JSON, "sparse_field", "an updated value");
XContentBuilder doc1UpdateSource = IndexRequest.getXContentBuilder(XContentType.JSON, "dense_field", null);

CountDownLatch chainExecuted = new CountDownLatch(1);
ActionFilterChain actionFilterChain = (task, action, request, listener) -> {
try {
BulkShardRequest bulkShardRequest = (BulkShardRequest) request;
assertNull(bulkShardRequest.getInferenceFieldMap());
assertThat(bulkShardRequest.items().length, equalTo(3));
assertThat(bulkShardRequest.items().length, equalTo(10));

for (BulkItemRequest item : bulkShardRequest.items()) {
assertNull(item.getPrimaryResponse());
}

assertThat(circuitBreaker.getUsed(), equalTo(0L));
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(128 + BytesReference.bytes(doc0Source).ramBytesUsed(), "doc_0");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(
denseModelEmbeddingBytes + BytesReference.bytes(doc1Source).ramBytesUsed(),
"doc_1"
);
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(128 + bytesUsed.apply(doc0Source), "doc_0");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(denseModelEmbeddingBytes + bytesUsed.apply(doc1Source), "doc_1");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(128 + denseModelEmbeddingBytes + bytesUsed.apply(doc2Source), "doc_2");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(denseModelEmbeddingBytes * 2L + bytesUsed.apply(doc3Source), "doc_3");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(bytesUsed.apply(doc4Source), "doc_4");
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(128 + bytesUsed.apply(doc0UpdateSource), "doc_0");
if (useLegacyFormat == false) {
verify(circuitBreaker).addEstimateBytesAndMaybeBreak(bytesUsed.apply(doc1UpdateSource), "doc_1");
}

// Verify that the only times that addEstimateBytesAndMaybeBreak is called is the two times verified above
verify(circuitBreaker, times(2)).addEstimateBytesAndMaybeBreak(anyLong(), anyString());
// Verify that the only times that addEstimateBytesAndMaybeBreak is called are the times verified above
verify(circuitBreaker, times(useLegacyFormat ? 6 : 7)).addEstimateBytesAndMaybeBreak(anyLong(), anyString());
} finally {
chainExecuted.countDown();
}
Expand All @@ -536,10 +572,20 @@ public void testCircuitBreaker() throws Exception {
new InferenceFieldMetadata("dense_field", denseModel.getInferenceEntityId(), new String[] { "dense_field" })
);

BulkItemRequest[] items = new BulkItemRequest[3];
BulkItemRequest[] items = new BulkItemRequest[10];
items[0] = new BulkItemRequest(0, new IndexRequest("index").id("doc_0").source(doc0Source));
items[1] = new BulkItemRequest(1, new IndexRequest("index").id("doc_1").source(doc1Source));
items[2] = new BulkItemRequest(2, new IndexRequest("index").id("doc_2").source("non_inference_field", "yet another test value"));
items[2] = new BulkItemRequest(2, new IndexRequest("index").id("doc_2").source(doc2Source));
items[3] = new BulkItemRequest(3, new IndexRequest("index").id("doc_3").source(doc3Source));
items[4] = new BulkItemRequest(4, new IndexRequest("index").id("doc_4").source(doc4Source));
items[5] = new BulkItemRequest(5, new IndexRequest("index").id("doc_5").source(doc5Source));
items[6] = new BulkItemRequest(6, new IndexRequest("index").id("doc_6").source("non_inference_field", "yet another test value"));
items[7] = new BulkItemRequest(7, new UpdateRequest().doc(new IndexRequest("index").id("doc_0").source(doc0UpdateSource)));
items[8] = new BulkItemRequest(8, new UpdateRequest().doc(new IndexRequest("index").id("doc_1").source(doc1UpdateSource)));
items[9] = new BulkItemRequest(
9,
new UpdateRequest().doc(new IndexRequest("index").id("doc_3").source("non_inference_field", "yet another updated value"))
);

BulkShardRequest request = new BulkShardRequest(new ShardId("test", "test", 0), WriteRequest.RefreshPolicy.NONE, items);
request.setInferenceFieldMap(inferenceFieldMap);
Expand Down