Skip to content

Commit 4f7ea81

Browse files
authored
Adjust translog operation assertions for synthetic source (#119330) (#119559)
When synthetic sources are used in peer recoveries, the translog operations via peer recoveries may differ from those created through replication. This change relaxes the translog operation assertion to account for synthetic source, allowing these operations to be considered equivalent. Closes #119191
1 parent ba4b1b2 commit 4f7ea81

File tree

14 files changed

+428
-114
lines changed

14 files changed

+428
-114
lines changed

muted-tests.yml

-3
Original file line numberDiff line numberDiff line change
@@ -416,9 +416,6 @@ tests:
416416
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
417417
method: test {p0=synonyms/90_synonyms_reloading_for_synset/Reload analyzers for specific synonym set}
418418
issue: https://github.com/elastic/elasticsearch/issues/116777
419-
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
420-
issue: https://github.com/elastic/elasticsearch/issues/119191
421-
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
422419
- class: org.elasticsearch.xpack.ml.integration.InferenceIngestInputConfigIT
423420
method: testIngestWithInputFields
424421
issue: https://github.com/elastic/elasticsearch/issues/118092

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,8 @@ private Translog openTranslog(
661661
translogDeletionPolicy,
662662
globalCheckpointSupplier,
663663
engineConfig.getPrimaryTermSupplier(),
664-
persistedSequenceNumberConsumer
664+
persistedSequenceNumberConsumer,
665+
TranslogOperationAsserter.withEngineConfig(engineConfig)
665666
);
666667
}
667668

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ public void trimUnreferencedTranslogFiles() {
166166
translogDeletionPolicy,
167167
engineConfig.getGlobalCheckpointSupplier(),
168168
engineConfig.getPrimaryTermSupplier(),
169-
seqNo -> {}
169+
seqNo -> {},
170+
TranslogOperationAsserter.DEFAULT
170171
)
171172
) {
172173
translog.trimUnreferencedReaders();

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
266266
translogDeletionPolicy,
267267
config.getGlobalCheckpointSupplier(),
268268
config.getPrimaryTermSupplier(),
269-
seqNo -> {}
269+
seqNo -> {},
270+
TranslogOperationAsserter.DEFAULT
270271
)
271272
) {
272273
return translog.stats();

server/src/main/java/org/elasticsearch/index/engine/TranslogDirectoryReader.java

+41-38
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ private static UnsupportedOperationException unsupported() {
9898
return new UnsupportedOperationException();
9999
}
100100

101-
public TranslogLeafReader getLeafReader() {
102-
return leafReader;
103-
}
104-
105101
@Override
106102
protected DirectoryReader doOpenIfChanged() {
107103
throw unsupported();
@@ -142,6 +138,45 @@ public CacheHelper getReaderCacheHelper() {
142138
return leafReader.getReaderCacheHelper();
143139
}
144140

141+
static DirectoryReader createInMemoryReader(
142+
ShardId shardId,
143+
EngineConfig engineConfig,
144+
Directory directory,
145+
DocumentParser documentParser,
146+
MappingLookup mappingLookup,
147+
Translog.Index operation
148+
) {
149+
final ParsedDocument parsedDocs = documentParser.parseDocument(
150+
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
151+
mappingLookup
152+
);
153+
154+
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
155+
parsedDocs.version().setLongValue(operation.version());
156+
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
157+
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
158+
IndexWriterConfig.OpenMode.CREATE
159+
).setCodec(engineConfig.getCodec());
160+
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
161+
writer.addDocument(parsedDocs.rootDoc());
162+
final DirectoryReader reader = open(writer);
163+
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
164+
reader.close();
165+
throw new IllegalStateException(
166+
"Expected a single document segment; "
167+
+ "but ["
168+
+ reader.leaves().size()
169+
+ " segments with "
170+
+ reader.leaves().get(0).reader().numDocs()
171+
+ " documents"
172+
);
173+
}
174+
return reader;
175+
} catch (IOException e) {
176+
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
177+
}
178+
}
179+
145180
private static class TranslogLeafReader extends LeafReader {
146181

147182
private static final FieldInfo FAKE_SOURCE_FIELD = new FieldInfo(
@@ -240,7 +275,8 @@ private LeafReader getDelegate() {
240275
ensureOpen();
241276
reader = delegate.get();
242277
if (reader == null) {
243-
reader = createInMemoryLeafReader();
278+
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
279+
reader = indexReader.leaves().get(0).reader();
244280
final LeafReader existing = delegate.getAndSet(reader);
245281
assert existing == null;
246282
onSegmentCreated.run();
@@ -250,39 +286,6 @@ private LeafReader getDelegate() {
250286
return reader;
251287
}
252288

253-
private LeafReader createInMemoryLeafReader() {
254-
assert Thread.holdsLock(this);
255-
final ParsedDocument parsedDocs = documentParser.parseDocument(
256-
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
257-
mappingLookup
258-
);
259-
260-
parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
261-
parsedDocs.version().setLongValue(operation.version());
262-
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
263-
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
264-
IndexWriterConfig.OpenMode.CREATE
265-
).setCodec(engineConfig.getCodec());
266-
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
267-
writer.addDocument(parsedDocs.rootDoc());
268-
final DirectoryReader reader = open(writer);
269-
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
270-
reader.close();
271-
throw new IllegalStateException(
272-
"Expected a single document segment; "
273-
+ "but ["
274-
+ reader.leaves().size()
275-
+ " segments with "
276-
+ reader.leaves().get(0).reader().numDocs()
277-
+ " documents"
278-
);
279-
}
280-
return reader.leaves().get(0).reader();
281-
} catch (IOException e) {
282-
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
283-
}
284-
}
285-
286289
@Override
287290
public CacheHelper getCoreCacheHelper() {
288291
return getDelegate().getCoreCacheHelper();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.search.similarities.BM25Similarity;
13+
import org.apache.lucene.store.ByteBuffersDirectory;
14+
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
15+
import org.elasticsearch.index.mapper.DocumentParser;
16+
import org.elasticsearch.index.mapper.MappingLookup;
17+
import org.elasticsearch.index.shard.ShardId;
18+
import org.elasticsearch.index.translog.Translog;
19+
20+
import java.io.IOException;
21+
22+
/**
23+
*
24+
* A utility class to assert that translog operations with the same sequence number
25+
* in the same generation are either identical or equivalent when synthetic sources are used.
26+
*/
27+
public abstract class TranslogOperationAsserter {
28+
public static final TranslogOperationAsserter DEFAULT = new TranslogOperationAsserter() {
29+
};
30+
31+
private TranslogOperationAsserter() {
32+
33+
}
34+
35+
public static TranslogOperationAsserter withEngineConfig(EngineConfig engineConfig) {
36+
return new TranslogOperationAsserter() {
37+
@Override
38+
public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
39+
if (super.assertSameIndexOperation(o1, o2)) {
40+
return true;
41+
}
42+
if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
43+
return super.assertSameIndexOperation(synthesizeSource(engineConfig, o1), o2)
44+
|| super.assertSameIndexOperation(o1, synthesizeSource(engineConfig, o2));
45+
}
46+
return false;
47+
}
48+
};
49+
}
50+
51+
static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index op) throws IOException {
52+
final ShardId shardId = engineConfig.getShardId();
53+
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
54+
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
55+
try (
56+
var directory = new ByteBuffersDirectory();
57+
var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op)
58+
) {
59+
final Engine.Searcher searcher = new Engine.Searcher(
60+
"assert_translog",
61+
reader,
62+
new BM25Similarity(),
63+
null,
64+
TrivialQueryCachingPolicy.NEVER,
65+
() -> {}
66+
);
67+
try (
68+
LuceneSyntheticSourceChangesSnapshot snapshot = new LuceneSyntheticSourceChangesSnapshot(
69+
mappingLookup,
70+
searcher,
71+
LuceneSyntheticSourceChangesSnapshot.DEFAULT_BATCH_SIZE,
72+
Integer.MAX_VALUE,
73+
op.seqNo(),
74+
op.seqNo(),
75+
true,
76+
false,
77+
engineConfig.getIndexSettings().getIndexVersionCreated()
78+
)
79+
) {
80+
final Translog.Operation normalized = snapshot.next();
81+
assert normalized != null : "expected one operation; got zero";
82+
return (Translog.Index) normalized;
83+
}
84+
}
85+
}
86+
87+
public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
88+
return Translog.Index.equalsWithoutAutoGeneratedTimestamp(o1, o2);
89+
}
90+
}

server/src/main/java/org/elasticsearch/index/translog/Translog.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.core.Releasable;
2828
import org.elasticsearch.index.IndexSettings;
2929
import org.elasticsearch.index.engine.Engine;
30+
import org.elasticsearch.index.engine.TranslogOperationAsserter;
3031
import org.elasticsearch.index.mapper.IdFieldMapper;
3132
import org.elasticsearch.index.mapper.MapperService;
3233
import org.elasticsearch.index.mapper.Uid;
@@ -123,6 +124,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
123124
private final TranslogDeletionPolicy deletionPolicy;
124125
private final LongConsumer persistedSequenceNumberConsumer;
125126
private final OperationListener operationListener;
127+
private final TranslogOperationAsserter operationAsserter;
126128

127129
/**
128130
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
@@ -150,14 +152,16 @@ public Translog(
150152
TranslogDeletionPolicy deletionPolicy,
151153
final LongSupplier globalCheckpointSupplier,
152154
final LongSupplier primaryTermSupplier,
153-
final LongConsumer persistedSequenceNumberConsumer
155+
final LongConsumer persistedSequenceNumberConsumer,
156+
final TranslogOperationAsserter operationAsserter
154157
) throws IOException {
155158
super(config.getShardId(), config.getIndexSettings());
156159
this.config = config;
157160
this.globalCheckpointSupplier = globalCheckpointSupplier;
158161
this.primaryTermSupplier = primaryTermSupplier;
159162
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
160163
this.operationListener = config.getOperationListener();
164+
this.operationAsserter = operationAsserter;
161165
this.deletionPolicy = deletionPolicy;
162166
this.translogUUID = translogUUID;
163167
this.bigArrays = config.getBigArrays();
@@ -582,6 +586,7 @@ TranslogWriter createWriter(
582586
bigArrays,
583587
diskIoBufferPool,
584588
operationListener,
589+
operationAsserter,
585590
config.fsync()
586591
);
587592
} catch (final IOException e) {
@@ -1265,17 +1270,8 @@ public boolean equals(Object o) {
12651270
return false;
12661271
}
12671272

1268-
Index index = (Index) o;
1269-
1270-
if (version != index.version
1271-
|| seqNo != index.seqNo
1272-
|| primaryTerm != index.primaryTerm
1273-
|| id.equals(index.id) == false
1274-
|| autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp
1275-
|| source.equals(index.source) == false) {
1276-
return false;
1277-
}
1278-
return Objects.equals(routing, index.routing);
1273+
Index other = (Index) o;
1274+
return autoGeneratedIdTimestamp == other.autoGeneratedIdTimestamp && equalsWithoutAutoGeneratedTimestamp(this, other);
12791275
}
12801276

12811277
@Override
@@ -1311,6 +1307,15 @@ public long getAutoGeneratedIdTimestamp() {
13111307
return autoGeneratedIdTimestamp;
13121308
}
13131309

1310+
public static boolean equalsWithoutAutoGeneratedTimestamp(Translog.Index o1, Translog.Index o2) {
1311+
return o1.version == o2.version
1312+
&& o1.seqNo == o2.seqNo
1313+
&& o1.primaryTerm == o2.primaryTerm
1314+
&& o1.id.equals(o2.id)
1315+
&& o1.source.equals(o2.source)
1316+
&& Objects.equals(o1.routing, o2.routing);
1317+
}
1318+
13141319
}
13151320

13161321
public static final class Delete extends Operation {
@@ -1958,6 +1963,7 @@ public static String createEmptyTranslog(
19581963
BigArrays.NON_RECYCLING_INSTANCE,
19591964
DiskIoBufferPool.INSTANCE,
19601965
TranslogConfig.NOOP_OPERATION_LISTENER,
1966+
TranslogOperationAsserter.DEFAULT,
19611967
true
19621968
);
19631969
writer.close();

0 commit comments

Comments
 (0)