Skip to content

Revive read/write engine lock to guard operations against resets #126311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
acquire read lock before refresh
  • Loading branch information
tlrx committed Apr 8, 2025
commit 6e4a5decfa2c5b812c753a6fe7de18f84afe33a3
30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
Expand Down Expand Up @@ -2384,4 +2385,33 @@ public void prepareForEngineReset() throws IOException {
public long getLastUnsafeSegmentGenerationForGets() {
throw new UnsupportedOperationException("Doesn't support getting the latest segment generation");
}

protected static <R extends ReferenceManager<ElasticsearchDirectoryReader>> R wrapForAssertions(R referenceManager, EngineConfig engineConfig) {
if (Assertions.ENABLED) {
referenceManager.addListener(new AssertRefreshListenerHoldsEngineReadLock(engineConfig.getEngineLock()));
}
return referenceManager;
}

/**
* RefreshListener that asserts that the engine read lock is held by the thread refreshing the reference.
*/
private static class AssertRefreshListenerHoldsEngineReadLock implements ReferenceManager.RefreshListener {

private final EngineReadWriteLock engineLock;

private AssertRefreshListenerHoldsEngineReadLock(EngineReadWriteLock engineLock) {
this.engineLock = Objects.requireNonNull(engineLock);
}

@Override
public void beforeRefresh() throws IOException {
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
assert engineLock.isReadLockedByCurrentThread() : Thread.currentThread();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ public InternalEngine(EngineConfig engineConfig) {
}
externalReaderManager = createReaderManager(new RefreshWarmerListener(logger, isClosed, engineConfig));
internalReaderManager = externalReaderManager.internalReaderManager;
this.internalReaderManager = internalReaderManager;
this.externalReaderManager = externalReaderManager;
this.internalReaderManager = wrapForAssertions(internalReaderManager, engineConfig);
this.externalReaderManager = wrapForAssertions(externalReaderManager, engineConfig);
internalReaderManager.addListener(versionMap);
this.lastUnsafeSegmentGenerationForGets = new AtomicLong(lastCommittedSegmentInfos.getGeneration());
assert pendingTranslogRecovery.get() == false : "translog recovery can't be pending before we set it";
Expand Down Expand Up @@ -2040,7 +2040,7 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
// both refresh types will result in an internal refresh but only the external will also
// pass the new reader reference to the external reader manager.
final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint();
boolean refreshed;
boolean refreshed = false;
long segmentGeneration = RefreshResult.UNKNOWN_GENERATION;
try {
// refresh does not need to hold readLock as ReferenceManager can handle correctly if the engine is closed in mid-way.
Expand All @@ -2051,12 +2051,30 @@ protected final RefreshResult refresh(String source, SearcherScope scope, boolea
// the second refresh will only do the extra work we have to do for warming caches etc.
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
long generationBeforeRefresh = lastCommittedSegmentInfos.getGeneration();

// The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads
// waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
// sometimes access the engine read lock, we need to ensure that they won't block if another thread is waiting for the

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a6cad19

// engine write lock, so we acquire the read lock upfront before the refresh lock.
final var engineReadLock = engineConfig.getEngineLock().readLock();

// it is intentional that we never refresh both internal / external together
if (block) {
referenceManager.maybeRefreshBlocking();
refreshed = true;
engineReadLock.lock();
try {
referenceManager.maybeRefreshBlocking();
refreshed = true;
} finally {
engineReadLock.unlock();
}
} else {
refreshed = referenceManager.maybeRefresh();
if (engineReadLock.tryLock()) {
try {
refreshed = referenceManager.maybeRefresh();
} finally {
engineReadLock.unlock();
}
}
}
if (refreshed) {
final ElasticsearchDirectoryReader current = referenceManager.acquire();
Expand Down