Skip to content

Revive read/write engine lock to guard operations against resets #126311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2025
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
assertNoEngineResetLock
  • Loading branch information
tlrx committed Apr 8, 2025
commit eb83fdb1426c114cb5988aa1f8823fab1e1628fe
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,7 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
Engine.IndexCommitRef indexCommit = null;
store.incRef();
try {
assert assertNoEngineResetLock();
synchronized (engineMutex) {
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
Expand Down Expand Up @@ -1812,6 +1813,7 @@ public CacheHelper getReaderCacheHelper() {
}

public void close(String reason, boolean flushEngine, Executor closeExecutor, ActionListener<Void> closeListener) throws IOException {
assert assertNoEngineResetLock();
synchronized (engineMutex) {
engineResetLock.readLock().lock(); // prevent engine resets while closing
try {
Expand Down Expand Up @@ -1977,6 +1979,7 @@ private void doLocalRecovery(
// First, start a temporary engine, recover the local translog up to the given checkpoint, and then close the engine again.
.<Void>newForked(l -> ActionListener.runWithResource(ActionListener.assertOnce(l), () -> () -> {
assert Thread.holdsLock(mutex) == false : "must not hold the mutex here";
assert assertNoEngineResetLock();
synchronized (engineMutex) {
engineResetLock.readLock().lock(); // prevent engine resets while closing
try {
Expand Down Expand Up @@ -2199,6 +2202,7 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {

private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {
assert Thread.holdsLock(mutex) == false : "opening engine under mutex";
assert assertNoEngineResetLock();
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -2303,6 +2307,7 @@ private void onNewEngine(Engine newEngine) {
*/
public void performRecoveryRestart() throws IOException {
assert Thread.holdsLock(mutex) == false : "restart recovery under mutex";
assert assertNoEngineResetLock();
synchronized (engineMutex) {
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
engineResetLock.readLock().lock(); // prevent engine resets while closing
Expand Down Expand Up @@ -4443,6 +4448,7 @@ public void afterRefresh(boolean didRefresh) {
public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert waitForEngineOrClosedShardListeners.isDone();
assert assertNoEngineResetLock();
Engine previousEngine = null;
try {
synchronized (engineMutex) {
Expand Down Expand Up @@ -4489,6 +4495,7 @@ public void resetEngine(Consumer<Engine> postResetNewEngineConsumer) {
*/
void rollbackEngineToGlobalCheckpoint() throws IOException {
assert Thread.holdsLock(mutex) == false : "resetting engine under mutex";
assert assertNoEngineResetLock();
assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "engine rollback without blocking operations; active operations are [" + getActiveOperationsCount() + ']';
sync(); // persist the global checkpoint to disk
Expand Down Expand Up @@ -4517,6 +4524,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
) {
@Override
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {
assert assertNoEngineResetLock();
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
Expand All @@ -4528,6 +4536,7 @@ public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) {

@Override
public IndexCommitRef acquireSafeIndexCommit() {
assert assertNoEngineResetLock();
synchronized (engineMutex) {
if (newEngineReference.get() == null) {
throw new AlreadyClosedException("engine was closed");
Expand All @@ -4539,6 +4548,7 @@ public IndexCommitRef acquireSafeIndexCommit() {
@Override
public void close() throws IOException {
Engine newEngine;
assert assertNoEngineResetLock();
synchronized (engineMutex) {
newEngine = newEngineReference.get();
if (newEngine == getCurrentEngine(true)) {
Expand Down Expand Up @@ -4692,4 +4702,16 @@ public void ensureMutable(ActionListener<Void> listener, boolean permitAcquired)
EngineResetLock getEngineResetLock() {
return engineResetLock;
}

private boolean assertNoEngineResetLock() {
assert engineResetLock.isReadLockedByCurrentThread()
: "Expected current thread ["
+ Thread.currentThread()
+ "] to not hold an engine read lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)";
assert engineResetLock.isWriteLockedByCurrentThread()
: "Expected current thread ["
+ Thread.currentThread()
+ "] to not hold the engine write lock (lock ordering should be: engineMutex -> engineResetLock -> mutex)";
return true;
}
}