Skip to content

Revive read/write engine lock to guard operations against resets #126311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ public void testInvalidateIndicesRequestCacheWhenRollbackEngine() throws Excepti
final CountDownLatch engineResetLatch = new CountDownLatch(1);
shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
try {
shard.resetEngineToGlobalCheckpoint();
shard.rollbackEngineToGlobalCheckpoint();
} finally {
r.close();
engineResetLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getRelativeTimeInNanosSupplier(),
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService()
config.getMapperService(),
config.getEngineResetLock()
);
}

Expand Down
38 changes: 36 additions & 2 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
Expand All @@ -76,6 +77,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.DenseVectorStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.EngineResetLock;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardFieldStats;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -1288,7 +1290,7 @@ public void externalRefresh(String source, ActionListener<Engine.RefreshResult>

/**
* Asynchronously refreshes the engine for new search operations to reflect the latest
* changes unless another thread is already refreshing the engine concurrently.
* changes unless another thread is already refreshing or reseting the engine concurrently.
*/
@Nullable
public abstract void maybeRefresh(String source, ActionListener<RefreshResult> listener) throws EngineException;
Expand Down Expand Up @@ -2371,7 +2373,7 @@ public record FlushResult(boolean flushPerformed, long generation) {
}

/**
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine()}.
* Ensures the engine is in a state that it can be closed by a call to {@link IndexShard#resetEngine(Consumer<Engine>)}.
*
* In general, resetting the engine should be done with care, to consider any
* in-progress operations and listeners (e.g., primary term and generation listeners).
Expand All @@ -2384,4 +2386,36 @@ public void prepareForEngineReset() throws IOException {
public long getLastUnsafeSegmentGenerationForGets() {
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
}

protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(
R referenceManager,
EngineConfig engineConfig
) {
if (Assertions.ENABLED) {
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineResetLock()));
}
return referenceManager;
}

/**
* RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference.
*/
private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener {

private final EngineResetLock engineLock;

private AssertRefreshListenerHoldsEngineReadLock(EngineResetLock engineLock) {
this.engineLock = Objects.requireNonNull(engineLock);
}

@Override
public void beforeRefresh() throws IOException {
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.EngineResetLock;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -146,6 +147,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final boolean promotableToPrimary;

private final EngineResetLock engineResetLock;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand Down Expand Up @@ -177,7 +180,8 @@ public EngineConfig(
LongSupplier relativeTimeInNanosSupplier,
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary,
MapperService mapperService
MapperService mapperService,
EngineResetLock engineResetLock
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -224,6 +228,7 @@ public EngineConfig(
this.promotableToPrimary = promotableToPrimary;
// always use compound on flush - reduces # of file-handles on refresh
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.engineResetLock = engineResetLock;
}

/**
Expand Down Expand Up @@ -468,4 +473,8 @@ public boolean getUseCompoundFile() {
public MapperService getMapperService() {
return mapperService;
}

public EngineResetLock getEngineResetLock() {
return engineResetLock;
}
}
Loading