Skip to content

Revive read/write engine lock to guard operations against resets #126311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 10, 2025

Conversation

tlrx
Copy link
Member

@tlrx tlrx commented Apr 4, 2025

Draft pull request that re-introduces the engine read/write lock to guard against engine resets.

It differs from #124635 on the following:

  • uses the engineMutex for creating/closing engines
  • uses the reentrant r/w lock for retaining engine instances and for resetting the engine
  • acquires the reentrant read lock during refreshes to prevent deadlocks during resets
  • add tests to ensure no deadlock when re-acquiring read lock in refresh listeners

To do:

  • add test for concurrent resets and engine acquisitions
  • replace getEngine by withEngine() in most places ?

@elasticsearchmachine elasticsearchmachine added needs:triage Requires assignment of a team area label v9.1.0 serverless-linked Added by automation, don't add manually labels Apr 4, 2025
@tlrx tlrx added >non-issue :Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. labels Apr 4, 2025
@elasticsearchmachine elasticsearchmachine added Team:Distributed Indexing Meta label for Distributed Indexing team and removed needs:triage Requires assignment of a team area label labels Apr 4, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This direction looks good. Only reviewed production parts for now. Left a few comments but nothing major.


// The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads
// waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the
// sometimes access the engine read lock, we need to ensure that they won't block if another thread is waiting for the

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a6cad19

final long startTime = System.nanoTime();
// Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could
// hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current
// thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock).
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's been detected by a test failure, see this thread dump:



Found one Java-level deadlock:
=============================
"elasticsearch[node_t3][clusterApplierService#updateTask][T#1]":
  waiting for ownable synchronizer 0x00000000e1ff06e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "elasticsearch[node_t3][generic][T#2]"

"elasticsearch[node_t3][generic][T#2]":
  waiting for ownable synchronizer 0x00000000e3a10a30, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "elasticsearch[node_t3][force_merge][T#1]"

"elasticsearch[node_t3][force_merge][T#1]":
  waiting for ownable synchronizer 0x00000000e1ff06e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "elasticsearch[node_t3][generic][T#2]"

Java stack information for the threads listed above:
===================================================
"elasticsearch[node_t3][clusterApplierService#updateTask][T#1]":
	at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
	- parking to wait for  <0x00000000e1ff06e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(java.base@24/AbstractQueuedSynchronizer.java:1118)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(java.base@24/ReentrantReadWriteLock.java:739)
	at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3350)
	at org.elasticsearch.index.shard.IndexShard.commitStats(IndexShard.java:1374)
	at org.elasticsearch.index.IndexService.getNodeMappingStats(IndexService.java:356)
	at co.elastic.elasticsearch.stateless.autoscaling.memory.ShardsMappingSizeCollector.updateMappingMetricsForAllIndices(ShardsMappingSizeCollector.java:277)
	at co.elastic.elasticsearch.stateless.autoscaling.memory.ShardsMappingSizeCollector.clusterChanged(ShardsMappingSizeCollector.java:205)
	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateListener(ClusterApplierService.java:594)
	at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateListeners(ClusterApplierService.java:579)
	at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:538)
	at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
	at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:977)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
	at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
	at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
	at java.lang.Thread.run(java.base@24/Thread.java:1447)
"elasticsearch[node_t3][generic][T#2]":
	at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
	- parking to wait for  <0x00000000e3a10a30> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:1029)
	at java.util.concurrent.locks.ReentrantLock$Sync.lock(java.base@24/ReentrantLock.java:154)
	at java.util.concurrent.locks.ReentrantLock.lock(java.base@24/ReentrantLock.java:323)
	at org.elasticsearch.index.engine.InternalEngine.flushHoldingLock(InternalEngine.java:2220)
	at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHoldingLock(IndexEngine.java:336)
	at org.elasticsearch.index.engine.Engine.flush(Engine.java:1334)
	at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHollow(IndexEngine.java:398)
	at co.elastic.elasticsearch.stateless.engine.IndexEngine.prepareForEngineReset(IndexEngine.java:458)
	at org.elasticsearch.index.shard.IndexShard.resetEngine(IndexShard.java:4452)
	- locked <0x00000000e3a10a20> (a java.lang.Object)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$handleStartRelocationWithFreshClusterState$13(TransportStatelessPrimaryRelocationAction.java:305)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcbae6dd0.accept(Unknown Source)
	at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:852)
	at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:803)
	at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:413)
	at org.elasticsearch.index.shard.IndexShardOperationPermits$1.doRun(IndexShardOperationPermits.java:119)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
	at org.elasticsearch.common.util.concurrent.EsExecutors$DirectExecutorService.execute(EsExecutors.java:316)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.waitUntilBlocked(IndexShardOperationPermits.java:94)
	at org.elasticsearch.index.shard.IndexShardOperationPermits.blockOperations(IndexShardOperationPermits.java:90)
	at org.elasticsearch.index.shard.IndexShard.relocated(IndexShard.java:803)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$handleStartRelocationWithFreshClusterState$14(TransportStatelessPrimaryRelocationAction.java:282)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcbae5d78.accept(Unknown Source)
	at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:261)
	at org.elasticsearch.action.support.SubscribableListener$SuccessResult.complete(SubscribableListener.java:394)
	at org.elasticsearch.action.support.SubscribableListener.tryComplete(SubscribableListener.java:314)
	at org.elasticsearch.action.support.SubscribableListener.addListener(SubscribableListener.java:210)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.handleStartRelocationWithFreshClusterState(TransportStatelessPrimaryRelocationAction.java:276)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$1.accept(TransportStatelessPrimaryRelocationAction.java:222)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$1.accept(TransportStatelessPrimaryRelocationAction.java:219)
	at org.elasticsearch.indices.recovery.PeerRecoverySourceClusterStateDelay.ensureClusterStateVersion(PeerRecoverySourceClusterStateDelay.java:42)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.handleStartRelocation(TransportStatelessPrimaryRelocationAction.java:210)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$new$2(TransportStatelessPrimaryRelocationAction.java:166)
	at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcb7872d8.messageReceived(Unknown Source)
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:90)
	at org.elasticsearch.transport.InboundHandler.doHandleRequest(InboundHandler.java:289)
	at org.elasticsearch.transport.InboundHandler$1.doRun(InboundHandler.java:302)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1044)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
	at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
	at java.lang.Thread.run(java.base@24/Thread.java:1447)
"elasticsearch[node_t3][force_merge][T#1]":
	at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
	- parking to wait for  <0x00000000e1ff06e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
	at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(java.base@24/AbstractQueuedSynchronizer.java:1118)
	at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(java.base@24/ReentrantReadWriteLock.java:739)
	at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:2063)
	at org.elasticsearch.index.engine.InternalEngine.flushHoldingLock(InternalEngine.java:2253)
	at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHoldingLock(IndexEngine.java:336)
	at org.elasticsearch.index.engine.Engine.flush(Engine.java:1334)
	at org.elasticsearch.index.engine.Engine.flush(Engine.java:1315)
	at org.elasticsearch.index.engine.InternalEngine.forceMerge(InternalEngine.java:2457)
	at co.elastic.elasticsearch.stateless.engine.IndexEngine.forceMerge(IndexEngine.java:693)
	at org.elasticsearch.index.shard.IndexShard.forceMerge(IndexShard.java:1575)
	at org.elasticsearch.action.admin.indices.forcemerge.TransportForceMergeAction.lambda$shardOperation$3(TransportForceMergeAction.java:106)
	at org.elasticsearch.action.admin.indices.forcemerge.TransportForceMergeAction$$Lambda/0x000076bfcbc0d938.get(Unknown Source)
	at org.elasticsearch.action.ActionRunnable$2.accept(ActionRunnable.java:58)
	at org.elasticsearch.action.ActionRunnable$2.accept(ActionRunnable.java:55)
	at org.elasticsearch.action.ActionRunnable$4.doRun(ActionRunnable.java:101)
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1044)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
	at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
	at java.lang.Thread.run(java.base@24/Thread.java:1447)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have precised that previously we were executing prepareForEngineReset under a read lock. We made it that way to avoid the issue with refresh listeners accessing the engine.

We later noticed that it wasn't enough, since refresh listeners are also called when creating engines under the write lock, so we decided to acquire a read lock before the refresh lock.

With this last change, prepareForEngineReset can be executed under the write lock since it is garanteed that no other thread would block the refresh lock. But it now expose a similar issue with flushes, so I applied the same solution as for refresh: acquiring a read lock before flushes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok doing this for now, but I wonder if the problem is that IndexShard.forceMerge does not use withEngine? (like many other getEngine usages that we'll have to fix in follow-ups, I'd expect us to remove getEngine if we can)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For that specific stack trace using withEngine within IndexShard.forceMerge will work. There are other callers of flush that would need the same adjustments, but it's too much work to tackling all at once.

(like many other getEngine usages that we'll have to fix in follow-ups, I'd expect us to remove getEngine if we can)

Yes, that's the plan, but I think we'll move step by step for doing this.

@tlrx tlrx requested a review from henningandersen April 8, 2025 07:12
@tlrx
Copy link
Member Author

tlrx commented Apr 8, 2025

Thanks for your feedback @henningandersen. I addressed your comments, removed a bit more read lock acquisitions that were unnecessary thanks to the engineMutex, and also adds a read lock acquisition before flushes.

I'm investigating the serverless build failures, but otherwise CI is happy here.

@tlrx tlrx force-pushed the ES-10826-the-return branch from 10ab76b to 07a8558 Compare April 8, 2025 07:20
Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

final long startTime = System.nanoTime();
// Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could
// hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current
// thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok doing this for now, but I wonder if the problem is that IndexShard.forceMerge does not use withEngine? (like many other getEngine usages that we'll have to fix in follow-ups, I'd expect us to remove getEngine if we can)

@tlrx tlrx requested a review from fcofdez April 8, 2025 14:44
Copy link
Contributor

@fcofdez fcofdez left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great job 🎉. I left a few minor comments, feel free to tackle them or not 👍.

Engine engineOrNull = getEngineOrNull();
if (engineOrNull != null) {
engineOrNull.onSettingsChanged();
engineResetLock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can't this use withEngine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 2411a9b to use withEngine here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... and just reverted, since withEngine ensure we're not calling from the cluster state applier thread.

Engine engine = getEngineOrNull();
if (engine == null) {
throw new AlreadyClosedException("engine is closed");
engineResetLock.readLock().lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, maybe we can simplify this to call withEngine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it as it is here (otherwise we'd leak the engine outside of the withEngine method, something I'd like to prevent)

@@ -3879,7 +3976,7 @@ private void innerAcquireReplicaOperationPermit(
maxSeqNo
);
if (currentGlobalCheckpoint < maxSeqNo) {
resetEngineToGlobalCheckpoint();
rollbackEngineToGlobalCheckpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change, this makes it clearer given the fact that we have a resetEngine function that does something slightly different.

holdEngineThread.start();
safeGet(hold);

assertThat(shard.getEngine(), instanceOf(InternalEngine.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe we can assert that the engine that we got from the hold future is the same instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I pushed 93c25da

closeShards(shard);
}

public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job on this test 👍

@tlrx tlrx merged commit 591fa87 into elastic:main Apr 10, 2025
17 checks passed
@tlrx
Copy link
Member Author

tlrx commented Apr 10, 2025

Thanks @henningandersen and @fcofdez !

@tlrx tlrx deleted the ES-10826-the-return branch April 10, 2025 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Indexing/Engine Anything around managing Lucene and the Translog in an open shard. >non-issue serverless-linked Added by automation, don't add manually Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants