-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Revive read/write engine lock to guard operations against resets #126311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
a72a005
to
a925aee
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This direction looks good. Only reviewed production parts for now. Left a few comments but nothing major.
|
||
// The shard uses a reentrant read/write lock to guard again engine changes, a type of lock that prioritizes the threads | ||
// waiting for the write lock over the threads trying to acquire a (non-reentrant) read lock. Because refresh listeners | ||
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// are accessing the engine read lock, we need to ensure that they won't block if another thread is waiting for the | |
// sometimes access the engine read lock, we need to ensure that they won't block if another thread is waiting for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed a6cad19
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/index/engine/EngineResetLock.java
Outdated
Show resolved
Hide resolved
final long startTime = System.nanoTime(); | ||
// Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could | ||
// hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current | ||
// thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been detected by a test failure, see this thread dump:
Found one Java-level deadlock:
=============================
"elasticsearch[node_t3][clusterApplierService#updateTask][T#1]":
waiting for ownable synchronizer 0x00000000e1ff06e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
which is held by "elasticsearch[node_t3][generic][T#2]"
"elasticsearch[node_t3][generic][T#2]":
waiting for ownable synchronizer 0x00000000e3a10a30, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "elasticsearch[node_t3][force_merge][T#1]"
"elasticsearch[node_t3][force_merge][T#1]":
waiting for ownable synchronizer 0x00000000e1ff06e0, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
which is held by "elasticsearch[node_t3][generic][T#2]"
Java stack information for the threads listed above:
===================================================
"elasticsearch[node_t3][clusterApplierService#updateTask][T#1]":
at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
- parking to wait for <0x00000000e1ff06e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(java.base@24/AbstractQueuedSynchronizer.java:1118)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(java.base@24/ReentrantReadWriteLock.java:739)
at org.elasticsearch.index.shard.IndexShard.getEngine(IndexShard.java:3350)
at org.elasticsearch.index.shard.IndexShard.commitStats(IndexShard.java:1374)
at org.elasticsearch.index.IndexService.getNodeMappingStats(IndexService.java:356)
at co.elastic.elasticsearch.stateless.autoscaling.memory.ShardsMappingSizeCollector.updateMappingMetricsForAllIndices(ShardsMappingSizeCollector.java:277)
at co.elastic.elasticsearch.stateless.autoscaling.memory.ShardsMappingSizeCollector.clusterChanged(ShardsMappingSizeCollector.java:205)
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateListener(ClusterApplierService.java:594)
at org.elasticsearch.cluster.service.ClusterApplierService.callClusterStateListeners(ClusterApplierService.java:579)
at org.elasticsearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:538)
at org.elasticsearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:460)
at org.elasticsearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:159)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:977)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:218)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:184)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
at java.lang.Thread.run(java.base@24/Thread.java:1447)
"elasticsearch[node_t3][generic][T#2]":
at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
- parking to wait for <0x00000000e3a10a30> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:1029)
at java.util.concurrent.locks.ReentrantLock$Sync.lock(java.base@24/ReentrantLock.java:154)
at java.util.concurrent.locks.ReentrantLock.lock(java.base@24/ReentrantLock.java:323)
at org.elasticsearch.index.engine.InternalEngine.flushHoldingLock(InternalEngine.java:2220)
at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHoldingLock(IndexEngine.java:336)
at org.elasticsearch.index.engine.Engine.flush(Engine.java:1334)
at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHollow(IndexEngine.java:398)
at co.elastic.elasticsearch.stateless.engine.IndexEngine.prepareForEngineReset(IndexEngine.java:458)
at org.elasticsearch.index.shard.IndexShard.resetEngine(IndexShard.java:4452)
- locked <0x00000000e3a10a20> (a java.lang.Object)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$handleStartRelocationWithFreshClusterState$13(TransportStatelessPrimaryRelocationAction.java:305)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcbae6dd0.accept(Unknown Source)
at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:852)
at org.elasticsearch.index.shard.IndexShard$2.onResponse(IndexShard.java:803)
at org.elasticsearch.action.ActionListener$3.onResponse(ActionListener.java:413)
at org.elasticsearch.index.shard.IndexShardOperationPermits$1.doRun(IndexShardOperationPermits.java:119)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
at org.elasticsearch.common.util.concurrent.EsExecutors$DirectExecutorService.execute(EsExecutors.java:316)
at org.elasticsearch.index.shard.IndexShardOperationPermits.waitUntilBlocked(IndexShardOperationPermits.java:94)
at org.elasticsearch.index.shard.IndexShardOperationPermits.blockOperations(IndexShardOperationPermits.java:90)
at org.elasticsearch.index.shard.IndexShard.relocated(IndexShard.java:803)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$handleStartRelocationWithFreshClusterState$14(TransportStatelessPrimaryRelocationAction.java:282)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcbae5d78.accept(Unknown Source)
at org.elasticsearch.action.ActionListenerImplementations$ResponseWrappingActionListener.onResponse(ActionListenerImplementations.java:261)
at org.elasticsearch.action.support.SubscribableListener$SuccessResult.complete(SubscribableListener.java:394)
at org.elasticsearch.action.support.SubscribableListener.tryComplete(SubscribableListener.java:314)
at org.elasticsearch.action.support.SubscribableListener.addListener(SubscribableListener.java:210)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.handleStartRelocationWithFreshClusterState(TransportStatelessPrimaryRelocationAction.java:276)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$1.accept(TransportStatelessPrimaryRelocationAction.java:222)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$1.accept(TransportStatelessPrimaryRelocationAction.java:219)
at org.elasticsearch.indices.recovery.PeerRecoverySourceClusterStateDelay.ensureClusterStateVersion(PeerRecoverySourceClusterStateDelay.java:42)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.handleStartRelocation(TransportStatelessPrimaryRelocationAction.java:210)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction.lambda$new$2(TransportStatelessPrimaryRelocationAction.java:166)
at co.elastic.elasticsearch.stateless.recovery.TransportStatelessPrimaryRelocationAction$$Lambda/0x000076bfcb7872d8.messageReceived(Unknown Source)
at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:90)
at org.elasticsearch.transport.InboundHandler.doHandleRequest(InboundHandler.java:289)
at org.elasticsearch.transport.InboundHandler$1.doRun(InboundHandler.java:302)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1044)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
at java.lang.Thread.run(java.base@24/Thread.java:1447)
"elasticsearch[node_t3][force_merge][T#1]":
at jdk.internal.misc.Unsafe.park(java.base@24/Native Method)
- parking to wait for <0x00000000e1ff06e0> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(java.base@24/LockSupport.java:223)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(java.base@24/AbstractQueuedSynchronizer.java:789)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(java.base@24/AbstractQueuedSynchronizer.java:1118)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(java.base@24/ReentrantReadWriteLock.java:739)
at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:2063)
at org.elasticsearch.index.engine.InternalEngine.flushHoldingLock(InternalEngine.java:2253)
at co.elastic.elasticsearch.stateless.engine.IndexEngine.flushHoldingLock(IndexEngine.java:336)
at org.elasticsearch.index.engine.Engine.flush(Engine.java:1334)
at org.elasticsearch.index.engine.Engine.flush(Engine.java:1315)
at org.elasticsearch.index.engine.InternalEngine.forceMerge(InternalEngine.java:2457)
at co.elastic.elasticsearch.stateless.engine.IndexEngine.forceMerge(IndexEngine.java:693)
at org.elasticsearch.index.shard.IndexShard.forceMerge(IndexShard.java:1575)
at org.elasticsearch.action.admin.indices.forcemerge.TransportForceMergeAction.lambda$shardOperation$3(TransportForceMergeAction.java:106)
at org.elasticsearch.action.admin.indices.forcemerge.TransportForceMergeAction$$Lambda/0x000076bfcbc0d938.get(Unknown Source)
at org.elasticsearch.action.ActionRunnable$2.accept(ActionRunnable.java:58)
at org.elasticsearch.action.ActionRunnable$2.accept(ActionRunnable.java:55)
at org.elasticsearch.action.ActionRunnable$4.doRun(ActionRunnable.java:101)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1044)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@24/ThreadPoolExecutor.java:1095)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@24/ThreadPoolExecutor.java:619)
at java.lang.Thread.runWith(java.base@24/Thread.java:1460)
at java.lang.Thread.run(java.base@24/Thread.java:1447)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have precised that previously we were executing prepareForEngineReset
under a read lock. We made it that way to avoid the issue with refresh listeners accessing the engine.
We later noticed that it wasn't enough, since refresh listeners are also called when creating engines under the write lock, so we decided to acquire a read lock before the refresh lock.
With this last change, prepareForEngineReset
can be executed under the write lock since it is garanteed that no other thread would block the refresh lock. But it now expose a similar issue with flushes, so I applied the same solution as for refresh: acquiring a read lock before flushes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok doing this for now, but I wonder if the problem is that IndexShard.forceMerge
does not use withEngine
? (like many other getEngine
usages that we'll have to fix in follow-ups, I'd expect us to remove getEngine
if we can)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For that specific stack trace using withEngine within IndexShard.forceMerge
will work. There are other callers of flush
that would need the same adjustments, but it's too much work to tackling all at once.
(like many other
getEngine
usages that we'll have to fix in follow-ups, I'd expect us to removegetEngine
if we can)
Yes, that's the plan, but I think we'll move step by step for doing this.
Thanks for your feedback @henningandersen. I addressed your comments, removed a bit more read lock acquisitions that were unnecessary thanks to the I'm investigating the serverless build failures, but otherwise CI is happy here. |
…t resets (elastic#124635)" (elastic#125915)" This reverts commit 7fadeeb.
10ab76b
to
07a8558
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Outdated
Show resolved
Hide resolved
final long startTime = System.nanoTime(); | ||
// Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could | ||
// hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current | ||
// thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am ok doing this for now, but I wonder if the problem is that IndexShard.forceMerge
does not use withEngine
? (like many other getEngine
usages that we'll have to fix in follow-ups, I'd expect us to remove getEngine
if we can)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great job 🎉. I left a few minor comments, feel free to tackle them or not 👍.
Engine engineOrNull = getEngineOrNull(); | ||
if (engineOrNull != null) { | ||
engineOrNull.onSettingsChanged(); | ||
engineResetLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can't this use withEngine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I pushed 2411a9b to use withEngine here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... and just reverted, since withEngine ensure we're not calling from the cluster state applier thread.
Engine engine = getEngineOrNull(); | ||
if (engine == null) { | ||
throw new AlreadyClosedException("engine is closed"); | ||
engineResetLock.readLock().lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, maybe we can simplify this to call withEngine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep it as it is here (otherwise we'd leak the engine outside of the withEngine method, something I'd like to prevent)
@@ -3879,7 +3976,7 @@ private void innerAcquireReplicaOperationPermit( | |||
maxSeqNo | |||
); | |||
if (currentGlobalCheckpoint < maxSeqNo) { | |||
resetEngineToGlobalCheckpoint(); | |||
rollbackEngineToGlobalCheckpoint(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice change, this makes it clearer given the fact that we have a resetEngine function that does something slightly different.
holdEngineThread.start(); | ||
safeGet(hold); | ||
|
||
assertThat(shard.getEngine(), instanceOf(InternalEngine.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we can assert that the engine that we got from the hold future is the same instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I pushed 93c25da
closeShards(shard); | ||
} | ||
|
||
public void testReentrantEngineReadLockAcquisitionInRefreshListener() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job on this test 👍
Thanks @henningandersen and @fcofdez ! |
Draft pull request that re-introduces the engine read/write lock to guard against engine resets.
It differs from #124635 on the following:
engineMutex
for creating/closing enginesTo do:
getEngine
bywithEngine()
in most places ?