Skip to content

Commit 10ab76b

Browse files
committed
also read lock on flushes
1 parent 168419e commit 10ab76b

File tree

1 file changed

+82
-71
lines changed

1 file changed

+82
-71
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 82 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,83 +2209,94 @@ protected void flushHoldingLock(boolean force, boolean waitIfOngoing, ActionList
22092209
throw new IllegalArgumentException(message);
22102210
}
22112211
final long generation;
2212-
if (flushLock.tryLock() == false) {
2213-
// if we can't get the lock right away we block if needed otherwise barf
2214-
if (waitIfOngoing == false) {
2215-
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
2216-
listener.onResponse(FlushResult.NO_FLUSH);
2217-
return;
2218-
}
2219-
logger.trace("waiting for in-flight flush to finish");
2220-
flushLock.lock();
2221-
logger.trace("acquired flush lock after blocking");
2222-
} else {
2223-
logger.trace("acquired flush lock immediately");
2224-
}
22252212

2226-
final long startTime = System.nanoTime();
2213+
// Acquire an engine read lock before the flush lock. If we were not acquiring a read lock here, a concurrent engine reset could
2214+
// hold the engine write lock and later be blocked waiting for the flush lock (still holding the write lock), while the current
2215+
// thread could be blocked waiting for the write lock to be released (and therefore never release the flush lock).
2216+
final var engineReadLock = engineConfig.getEngineResetLock().readLock();
2217+
engineReadLock.lock();
22272218
try {
2228-
// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
2229-
// newly created commit points to a different translog generation (can free translog),
2230-
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
2231-
boolean hasUncommittedChanges = hasUncommittedChanges();
2232-
if (hasUncommittedChanges
2233-
|| force
2234-
|| shouldPeriodicallyFlush()
2235-
|| getProcessedLocalCheckpoint() > Long.parseLong(
2236-
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
2237-
)) {
2238-
ensureCanFlush();
2239-
Translog.Location commitLocation = getTranslogLastWriteLocation();
2240-
try {
2241-
translog.rollGeneration();
2242-
logger.trace("starting commit for flush; commitTranslog=true");
2243-
long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();
2244-
// Pre-emptively recording the upcoming segment generation so that the live version map archive records
2245-
// the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise,
2246-
// if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive,
2247-
// we clear them from the archive once we see that segment generation on the search shards, but those changes
2248-
// were not included in the commit since they happened right after it.
2249-
preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
2250-
commitIndexWriter(indexWriter, translog);
2251-
logger.trace("finished commit for flush");
2252-
// we need to refresh in order to clear older version values
2253-
refresh("version_table_flush", SearcherScope.INTERNAL, true);
2254-
translog.trimUnreferencedReaders();
2255-
// Update the translog location for flushListener if (1) the writeLocation has changed during the flush and
2256-
// (2) indexWriter has committed all the changes (checks must be done in this order).
2257-
// If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended.
2258-
final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation();
2259-
if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) {
2260-
assert writeLocationAfterFlush.compareTo(commitLocation) > 0 : writeLocationAfterFlush + " <= " + commitLocation;
2261-
commitLocation = writeLocationAfterFlush;
2262-
}
2263-
// Use the timestamp from when the flush started, but only update it in case of success, so that any exception in
2264-
// the above lines would not lead the engine to think that it recently flushed, when it did not.
2265-
this.lastFlushTimestamp = lastFlushTimestamp;
2266-
} catch (AlreadyClosedException e) {
2267-
failOnTragicEvent(e);
2268-
throw e;
2269-
} catch (Exception e) {
2270-
throw new FlushFailedEngineException(shardId, e);
2219+
if (flushLock.tryLock() == false) {
2220+
// if we can't get the lock right away we block if needed otherwise barf
2221+
if (waitIfOngoing == false) {
2222+
logger.trace("detected an in-flight flush, not blocking to wait for it's completion");
2223+
listener.onResponse(FlushResult.NO_FLUSH);
2224+
return;
22712225
}
2272-
refreshLastCommittedSegmentInfos();
2273-
generation = lastCommittedSegmentInfos.getGeneration();
2274-
flushListener.afterFlush(generation, commitLocation);
2226+
logger.trace("waiting for in-flight flush to finish");
2227+
flushLock.lock();
2228+
logger.trace("acquired flush lock after blocking");
22752229
} else {
2276-
generation = lastCommittedSegmentInfos.getGeneration();
2230+
logger.trace("acquired flush lock immediately");
2231+
}
2232+
2233+
final long startTime = System.nanoTime();
2234+
try {
2235+
// Only flush if (1) Lucene has uncommitted docs, or (2) forced by caller, or (3) the
2236+
// newly created commit points to a different translog generation (can free translog),
2237+
// or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries.
2238+
boolean hasUncommittedChanges = hasUncommittedChanges();
2239+
if (hasUncommittedChanges
2240+
|| force
2241+
|| shouldPeriodicallyFlush()
2242+
|| getProcessedLocalCheckpoint() > Long.parseLong(
2243+
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
2244+
)) {
2245+
ensureCanFlush();
2246+
Translog.Location commitLocation = getTranslogLastWriteLocation();
2247+
try {
2248+
translog.rollGeneration();
2249+
logger.trace("starting commit for flush; commitTranslog=true");
2250+
long lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong();
2251+
// Pre-emptively recording the upcoming segment generation so that the live version map archive records
2252+
// the correct segment generation for doc IDs that go to the archive while a flush is happening. Otherwise,
2253+
// if right after committing the IndexWriter new docs get indexed/updated and a refresh moves them to the archive,
2254+
// we clear them from the archive once we see that segment generation on the search shards, but those changes
2255+
// were not included in the commit since they happened right after it.
2256+
preCommitSegmentGeneration.set(lastCommittedSegmentInfos.getGeneration() + 1);
2257+
commitIndexWriter(indexWriter, translog);
2258+
logger.trace("finished commit for flush");
2259+
// we need to refresh in order to clear older version values
2260+
refresh("version_table_flush", SearcherScope.INTERNAL, true);
2261+
translog.trimUnreferencedReaders();
2262+
// Update the translog location for flushListener if (1) the writeLocation has changed during the flush and
2263+
// (2) indexWriter has committed all the changes (checks must be done in this order).
2264+
// If the indexWriter has uncommitted changes, they will be flushed by the next flush as intended.
2265+
final Translog.Location writeLocationAfterFlush = translog.getLastWriteLocation();
2266+
if (writeLocationAfterFlush.equals(commitLocation) == false && hasUncommittedChanges() == false) {
2267+
assert writeLocationAfterFlush.compareTo(commitLocation) > 0
2268+
: writeLocationAfterFlush + " <= " + commitLocation;
2269+
commitLocation = writeLocationAfterFlush;
2270+
}
2271+
// Use the timestamp from when the flush started, but only update it in case of success, so that any exception in
2272+
// the above lines would not lead the engine to think that it recently flushed, when it did not.
2273+
this.lastFlushTimestamp = lastFlushTimestamp;
2274+
} catch (AlreadyClosedException e) {
2275+
failOnTragicEvent(e);
2276+
throw e;
2277+
} catch (Exception e) {
2278+
throw new FlushFailedEngineException(shardId, e);
2279+
}
2280+
refreshLastCommittedSegmentInfos();
2281+
generation = lastCommittedSegmentInfos.getGeneration();
2282+
flushListener.afterFlush(generation, commitLocation);
2283+
} else {
2284+
generation = lastCommittedSegmentInfos.getGeneration();
2285+
}
2286+
} catch (FlushFailedEngineException ex) {
2287+
maybeFailEngine("flush", ex);
2288+
listener.onFailure(ex);
2289+
return;
2290+
} catch (Exception e) {
2291+
listener.onFailure(e);
2292+
return;
2293+
} finally {
2294+
totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime);
2295+
flushLock.unlock();
2296+
logger.trace("released flush lock");
22772297
}
2278-
} catch (FlushFailedEngineException ex) {
2279-
maybeFailEngine("flush", ex);
2280-
listener.onFailure(ex);
2281-
return;
2282-
} catch (Exception e) {
2283-
listener.onFailure(e);
2284-
return;
22852298
} finally {
2286-
totalFlushTimeExcludingWaitingOnLock.inc(System.nanoTime() - startTime);
2287-
flushLock.unlock();
2288-
logger.trace("released flush lock");
2299+
engineReadLock.unlock();
22892300
}
22902301

22912302
afterFlush(generation);

0 commit comments

Comments
 (0)