diff --git a/server/src/main/java/org/elasticsearch/index/codec/vectors/ES814HnswScalarQuantizedVectorsFormat.java b/server/src/main/java/org/elasticsearch/index/codec/vectors/ES814HnswScalarQuantizedVectorsFormat.java index acd5403187309..0363b0aff786a 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/vectors/ES814HnswScalarQuantizedVectorsFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/vectors/ES814HnswScalarQuantizedVectorsFormat.java @@ -65,9 +65,6 @@ public ES814HnswScalarQuantizedVectorsFormat( } this.maxConn = maxConn; this.beamWidth = beamWidth; - if (numMergeWorkers > 1 && mergeExec == null) { - throw new IllegalArgumentException("No executor service passed in when " + numMergeWorkers + " merge workers are requested"); - } if (numMergeWorkers == 1 && mergeExec != null) { throw new IllegalArgumentException("No executor service is needed as we'll use single thread to merge"); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java index 59ef10354cf54..306471be5a714 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchConcurrentMergeScheduler.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Locale; import java.util.Set; +import java.util.concurrent.Executor; /** * An extension to the {@link ConcurrentMergeScheduler} that provides tracking on merge times, total @@ -78,6 +79,17 @@ protected boolean verbose() { return super.verbose(); } + // TODO: this is temporarily, remove this override and enable multithreaded merges for all kind of merges + @Override + public Executor getIntraMergeExecutor(MergePolicy.OneMerge merge) { + // Enable multithreaded merges only for force merge operations + if (merge.getStoreMergeInfo().mergeMaxNumSegments != -1) { + return super.getIntraMergeExecutor(merge); + } else { + return null; + } + } + @Override /** Overridden to route specific MergeThread messages to our logger. */ protected void message(String message) { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 9476c3e719e0b..2ff81f9cc1bb6 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -629,7 +629,8 @@ private static void postProcessDynamicArrayMapping(DocumentParserContext context DenseVectorFieldMapper.Builder builder = new DenseVectorFieldMapper.Builder( fieldName, - context.indexSettings().getIndexVersionCreated() + context.indexSettings().getIndexVersionCreated(), + context.indexSettings().getMergeSchedulerConfig().getMaxThreadCount() ); DenseVectorFieldMapper denseVectorFieldMapper = builder.build(builderContext); context.updateDynamicMappers(fullFieldName, List.of(denseVectorFieldMapper)); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java index 53e5feac1f563..942b4aa02bbe2 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/vectors/DenseVectorFieldMapper.java @@ -159,10 +159,12 @@ public static class Builder extends FieldMapper.Builder { private final Parameter> meta = Parameter.metaParam(); final IndexVersion indexVersionCreated; + final int mergeThreadCount; - public Builder(String name, IndexVersion indexVersionCreated) { + public Builder(String name, IndexVersion indexVersionCreated, int mergeThreadCount) { super(name); this.indexVersionCreated = indexVersionCreated; + this.mergeThreadCount = mergeThreadCount; final boolean indexedByDefault = indexVersionCreated.onOrAfter(INDEXED_BY_DEFAULT_INDEX_VERSION); final boolean defaultInt8Hnsw = indexVersionCreated.onOrAfter(DEFAULT_DENSE_VECTOR_TO_INT8_HNSW); this.indexed = Parameter.indexParam(m -> toType(m).fieldType().indexed, indexedByDefault); @@ -255,6 +257,7 @@ public DenseVectorFieldMapper build(MapperBuilderContext context) { ), indexOptions.getValue(), indexVersionCreated, + mergeThreadCount, multiFieldsBuilder.build(this, context), copyTo ); @@ -838,7 +841,7 @@ private abstract static class IndexOptions implements ToXContent { this.type = type; } - abstract KnnVectorsFormat getVectorsFormat(); + abstract KnnVectorsFormat getVectorsFormat(int mergeThreadCount); boolean supportsElementType(ElementType elementType) { return true; @@ -938,7 +941,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - KnnVectorsFormat getVectorsFormat() { + KnnVectorsFormat getVectorsFormat(int mergeThreadCount) { return new ES813Int8FlatVectorFormat(confidenceInterval); } @@ -976,7 +979,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } @Override - KnnVectorsFormat getVectorsFormat() { + KnnVectorsFormat getVectorsFormat(int mergeThreadCount) { return new ES813FlatVectorFormat(); } @@ -1005,10 +1008,10 @@ private Int8HnswIndexOptions(int m, int efConstruction, Float confidenceInterval } @Override - public KnnVectorsFormat getVectorsFormat() { + public KnnVectorsFormat getVectorsFormat(int mergeThreadCount) { // int bits = 7; // boolean compress = false; // TODO we only support 7 and false, for now - return new ES814HnswScalarQuantizedVectorsFormat(m, efConstruction, 1, confidenceInterval, null); + return new ES814HnswScalarQuantizedVectorsFormat(m, efConstruction, mergeThreadCount, confidenceInterval, null); } @Override @@ -1067,8 +1070,8 @@ private HnswIndexOptions(int m, int efConstruction) { } @Override - public KnnVectorsFormat getVectorsFormat() { - return new Lucene99HnswVectorsFormat(m, efConstruction, 1, null); + public KnnVectorsFormat getVectorsFormat(int mergeThreadCount) { + return new Lucene99HnswVectorsFormat(m, efConstruction, mergeThreadCount, null); } @Override @@ -1101,7 +1104,7 @@ public String toString() { } public static final TypeParser PARSER = new TypeParser( - (n, c) -> new Builder(n, c.indexVersionCreated()), + (n, c) -> new Builder(n, c.indexVersionCreated(), c.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()), notInMultiFields(CONTENT_TYPE) ); @@ -1394,18 +1397,21 @@ ElementType getElementType() { private final IndexOptions indexOptions; private final IndexVersion indexCreatedVersion; + private final int mergeThreadCount; private DenseVectorFieldMapper( String simpleName, MappedFieldType mappedFieldType, IndexOptions indexOptions, IndexVersion indexCreatedVersion, + int mergeThreadCount, MultiFields multiFields, CopyTo copyTo ) { super(simpleName, mappedFieldType, multiFields, copyTo); this.indexOptions = indexOptions; this.indexCreatedVersion = indexCreatedVersion; + this.mergeThreadCount = mergeThreadCount; } @Override @@ -1448,6 +1454,7 @@ public void parse(DocumentParserContext context) throws IOException { updatedDenseVectorFieldType, indexOptions, indexCreatedVersion, + mergeThreadCount, multiFields(), copyTo ); @@ -1535,7 +1542,7 @@ protected String contentType() { @Override public FieldMapper.Builder getMergeBuilder() { - return new Builder(simpleName(), indexCreatedVersion).init(this); + return new Builder(simpleName(), indexCreatedVersion, mergeThreadCount).init(this); } private static IndexOptions parseIndexOptions(String fieldName, Object propNode) { @@ -1560,7 +1567,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(KnnVectorsFormat defaultForm if (indexOptions == null) { format = defaultFormat; } else { - format = indexOptions.getVectorsFormat(); + format = indexOptions.getVectorsFormat(mergeThreadCount); } // It's legal to reuse the same format name as this is the same on-disk format. return new KnnVectorsFormat(format.getName()) { diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java index c4293d16ce6a4..1335bd20a43d3 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/mapper/SemanticTextFieldMapper.java @@ -65,12 +65,13 @@ public class SemanticTextFieldMapper extends FieldMapper implements InferenceFie public static final String CONTENT_TYPE = "semantic_text"; public static final TypeParser PARSER = new TypeParser( - (n, c) -> new Builder(n, c.indexVersionCreated()), + (n, c) -> new Builder(n, c.indexVersionCreated(), c.getIndexSettings().getMergeSchedulerConfig().getMaxThreadCount()), notInMultiFields(CONTENT_TYPE) ); public static class Builder extends FieldMapper.Builder { private final IndexVersion indexVersionCreated; + private final int mergeThreadCount; private final Parameter inferenceId = Parameter.stringParam( "inference_id", @@ -97,10 +98,11 @@ public static class Builder extends FieldMapper.Builder { private Function inferenceFieldBuilder; - public Builder(String name, IndexVersion indexVersionCreated) { + public Builder(String name, IndexVersion indexVersionCreated, int mergeThreadCount) { super(name); this.indexVersionCreated = indexVersionCreated; - this.inferenceFieldBuilder = c -> createInferenceField(c, indexVersionCreated, modelSettings.get()); + this.mergeThreadCount = mergeThreadCount; + this.inferenceFieldBuilder = c -> createInferenceField(c, indexVersionCreated, mergeThreadCount, modelSettings.get()); } public Builder setInferenceId(String id) { @@ -148,6 +150,7 @@ public SemanticTextFieldMapper build(MapperBuilderContext context) { modelSettings.getValue(), inferenceField, indexVersionCreated, + mergeThreadCount, meta.getValue() ), copyTo @@ -168,7 +171,7 @@ public Iterator iterator() { @Override public FieldMapper.Builder getMergeBuilder() { - return new Builder(simpleName(), fieldType().indexVersionCreated).init(this); + return new Builder(simpleName(), fieldType().indexVersionCreated, fieldType().mergeThreadCount).init(this); } @Override @@ -203,7 +206,7 @@ protected void parseCreateField(DocumentParserContext context) throws IOExceptio final SemanticTextFieldMapper mapper; if (fieldType().getModelSettings() == null) { context.path().remove(); - Builder builder = (Builder) new Builder(simpleName(), fieldType().indexVersionCreated).init(this); + Builder builder = (Builder) new Builder(simpleName(), fieldType().indexVersionCreated, fieldType().mergeThreadCount).init(this); try { mapper = builder.setModelSettings(field.inference().modelSettings()) .setInferenceId(field.inference().inferenceId()) @@ -270,6 +273,7 @@ public static class SemanticTextFieldType extends SimpleMappedFieldType { private final SemanticTextField.ModelSettings modelSettings; private final ObjectMapper inferenceField; private final IndexVersion indexVersionCreated; + private final int mergeThreadCount; public SemanticTextFieldType( String name, @@ -277,6 +281,7 @@ public SemanticTextFieldType( SemanticTextField.ModelSettings modelSettings, ObjectMapper inferenceField, IndexVersion indexVersionCreated, + int mergeThreadCount, Map meta ) { super(name, false, false, false, TextSearchInfo.NONE, meta); @@ -284,6 +289,7 @@ public SemanticTextFieldType( this.modelSettings = modelSettings; this.inferenceField = inferenceField; this.indexVersionCreated = indexVersionCreated; + this.mergeThreadCount = mergeThreadCount; } @Override @@ -331,15 +337,17 @@ public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext private static ObjectMapper createInferenceField( MapperBuilderContext context, IndexVersion indexVersionCreated, + int mergeThreadCount, @Nullable SemanticTextField.ModelSettings modelSettings ) { return new ObjectMapper.Builder(INFERENCE_FIELD, Explicit.EXPLICIT_TRUE).dynamic(ObjectMapper.Dynamic.FALSE) - .add(createChunksField(indexVersionCreated, modelSettings)) + .add(createChunksField(indexVersionCreated, mergeThreadCount, modelSettings)) .build(context); } private static NestedObjectMapper.Builder createChunksField( IndexVersion indexVersionCreated, + int mergeThreadCount, SemanticTextField.ModelSettings modelSettings ) { NestedObjectMapper.Builder chunksField = new NestedObjectMapper.Builder(CHUNKS_FIELD, indexVersionCreated); @@ -347,19 +355,24 @@ private static NestedObjectMapper.Builder createChunksField( KeywordFieldMapper.Builder chunkTextField = new KeywordFieldMapper.Builder(CHUNKED_TEXT_FIELD, indexVersionCreated).indexed(false) .docValues(false); if (modelSettings != null) { - chunksField.add(createEmbeddingsField(indexVersionCreated, modelSettings)); + chunksField.add(createEmbeddingsField(indexVersionCreated, mergeThreadCount, modelSettings)); } chunksField.add(chunkTextField); return chunksField; } - private static Mapper.Builder createEmbeddingsField(IndexVersion indexVersionCreated, SemanticTextField.ModelSettings modelSettings) { + private static Mapper.Builder createEmbeddingsField( + IndexVersion indexVersionCreated, + int mergeThreadCount, + SemanticTextField.ModelSettings modelSettings + ) { return switch (modelSettings.taskType()) { case SPARSE_EMBEDDING -> new SparseVectorFieldMapper.Builder(CHUNKED_EMBEDDINGS_FIELD); case TEXT_EMBEDDING -> { DenseVectorFieldMapper.Builder denseVectorMapperBuilder = new DenseVectorFieldMapper.Builder( CHUNKED_EMBEDDINGS_FIELD, - indexVersionCreated + indexVersionCreated, + mergeThreadCount ); SimilarityMeasure similarity = modelSettings.similarity(); if (similarity != null) {