Skip to content

Commit 1f49000

Browse files
authored
Fix edge case for active flag for flush on idle (elastic#97332)
Basically the active flag needs to be set to true after the op has been processed by the engine. Else there is an edge case, which may leave unprocessed ops and active false, without a next flush on idle scheduled. Introducing a test for this edge case. Fixes elastic#97154
1 parent 3ecb7e7 commit 1f49000

File tree

5 files changed

+164
-72
lines changed

5 files changed

+164
-72
lines changed

docs/changelog/97332.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 97332
2+
summary: Fix edge case for active flag for flush on idle
3+
area: Engine
4+
type: enhancement
5+
issues:
6+
- 97154

server/src/internalClusterTest/java/org/elasticsearch/indices/flush/FlushIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.plugins.Plugin;
2020
import org.elasticsearch.test.ESIntegTestCase;
2121
import org.elasticsearch.test.InternalSettingsPlugin;
22-
import org.elasticsearch.test.InternalTestCluster;
2322
import org.elasticsearch.xcontent.XContentType;
2423

2524
import java.util.Arrays;
@@ -128,7 +127,7 @@ public void testFlushOnInactive() throws Exception {
128127
client().prepareIndex(indexName).setSource("f", "v").get();
129128
}
130129
if (randomBoolean()) {
131-
internalCluster().restartNode(randomFrom(dataNodes), new InternalTestCluster.RestartCallback());
130+
internalCluster().restartNode(randomFrom(dataNodes));
132131
ensureGreen(indexName);
133132
}
134133
assertBusy(() -> {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 71 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1031,58 +1031,61 @@ public static Engine.Index prepareIndex(
10311031
}
10321032

10331033
private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOException {
1034-
active.set(true);
1035-
final Engine.IndexResult result;
1036-
final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);
10371034
try {
1038-
if (logger.isTraceEnabled()) {
1039-
// don't use index.source().utf8ToString() here source might not be valid UTF-8
1040-
logger.trace(
1041-
"index [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
1042-
preIndex.id(),
1043-
preIndex.seqNo(),
1044-
routingEntry().allocationId(),
1045-
preIndex.primaryTerm(),
1046-
getOperationPrimaryTerm(),
1047-
preIndex.origin()
1048-
);
1049-
}
1050-
result = engine.index(preIndex);
1051-
if (logger.isTraceEnabled()) {
1052-
logger.trace(
1053-
"index-done [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}] "
1054-
+ "result-seq# [{}] result-term [{}] failure [{}]",
1055-
preIndex.id(),
1056-
preIndex.seqNo(),
1057-
routingEntry().allocationId(),
1058-
preIndex.primaryTerm(),
1059-
getOperationPrimaryTerm(),
1060-
preIndex.origin(),
1061-
result.getSeqNo(),
1062-
result.getTerm(),
1063-
result.getFailure()
1064-
);
1065-
}
1066-
} catch (Exception e) {
1067-
if (logger.isTraceEnabled()) {
1068-
logger.trace(
1069-
() -> format(
1070-
"index-fail [%s] seq# [%s] allocation-id [%s] primaryTerm [%s] operationPrimaryTerm [%s] origin [%s]",
1035+
final Engine.IndexResult result;
1036+
final Engine.Index preIndex = indexingOperationListeners.preIndex(shardId, index);
1037+
try {
1038+
if (logger.isTraceEnabled()) {
1039+
// don't use index.source().utf8ToString() here source might not be valid UTF-8
1040+
logger.trace(
1041+
"index [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}]",
10711042
preIndex.id(),
10721043
preIndex.seqNo(),
10731044
routingEntry().allocationId(),
10741045
preIndex.primaryTerm(),
10751046
getOperationPrimaryTerm(),
10761047
preIndex.origin()
1077-
),
1078-
e
1079-
);
1048+
);
1049+
}
1050+
result = engine.index(preIndex);
1051+
if (logger.isTraceEnabled()) {
1052+
logger.trace(
1053+
"index-done [{}] seq# [{}] allocation-id [{}] primaryTerm [{}] operationPrimaryTerm [{}] origin [{}] "
1054+
+ "result-seq# [{}] result-term [{}] failure [{}]",
1055+
preIndex.id(),
1056+
preIndex.seqNo(),
1057+
routingEntry().allocationId(),
1058+
preIndex.primaryTerm(),
1059+
getOperationPrimaryTerm(),
1060+
preIndex.origin(),
1061+
result.getSeqNo(),
1062+
result.getTerm(),
1063+
result.getFailure()
1064+
);
1065+
}
1066+
} catch (Exception e) {
1067+
if (logger.isTraceEnabled()) {
1068+
logger.trace(
1069+
() -> format(
1070+
"index-fail [%s] seq# [%s] allocation-id [%s] primaryTerm [%s] operationPrimaryTerm [%s] origin [%s]",
1071+
preIndex.id(),
1072+
preIndex.seqNo(),
1073+
routingEntry().allocationId(),
1074+
preIndex.primaryTerm(),
1075+
getOperationPrimaryTerm(),
1076+
preIndex.origin()
1077+
),
1078+
e
1079+
);
1080+
}
1081+
indexingOperationListeners.postIndex(shardId, preIndex, e);
1082+
throw e;
10801083
}
1081-
indexingOperationListeners.postIndex(shardId, preIndex, e);
1082-
throw e;
1084+
indexingOperationListeners.postIndex(shardId, preIndex, result);
1085+
return result;
1086+
} finally {
1087+
active.set(true);
10831088
}
1084-
indexingOperationListeners.postIndex(shardId, preIndex, result);
1085-
return result;
10861089
}
10871090

10881091
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, long opPrimaryTerm, String reason) throws IOException {
@@ -1100,11 +1103,14 @@ private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrim
11001103
}
11011104

11021105
private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) throws IOException {
1103-
active.set(true);
1104-
if (logger.isTraceEnabled()) {
1105-
logger.trace("noop (seq# [{}])", noOp.seqNo());
1106+
try {
1107+
if (logger.isTraceEnabled()) {
1108+
logger.trace("noop (seq# [{}])", noOp.seqNo());
1109+
}
1110+
return engine.noOp(noOp);
1111+
} finally {
1112+
active.set(true);
11061113
}
1107-
return engine.noOp(noOp);
11081114
}
11091115

11101116
public Engine.IndexResult getFailedIndexResult(Exception e, long version, String id) {
@@ -1164,23 +1170,26 @@ private Engine.DeleteResult applyDeleteOperation(
11641170
assert opPrimaryTerm <= getOperationPrimaryTerm()
11651171
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
11661172
ensureWriteAllowed(origin);
1167-
active.set(true);
1168-
Engine.Delete delete = indexingOperationListeners.preDelete(
1169-
shardId,
1170-
prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm)
1171-
);
1172-
final Engine.DeleteResult result;
11731173
try {
1174-
if (logger.isTraceEnabled()) {
1175-
logger.trace("delete [{}] (seq no [{}])", delete.uid().text(), delete.seqNo());
1174+
Engine.Delete delete = indexingOperationListeners.preDelete(
1175+
shardId,
1176+
prepareDelete(id, seqNo, opPrimaryTerm, version, versionType, origin, ifSeqNo, ifPrimaryTerm)
1177+
);
1178+
final Engine.DeleteResult result;
1179+
try {
1180+
if (logger.isTraceEnabled()) {
1181+
logger.trace("delete [{}] (seq no [{}])", delete.uid().text(), delete.seqNo());
1182+
}
1183+
result = engine.delete(delete);
1184+
} catch (Exception e) {
1185+
indexingOperationListeners.postDelete(shardId, delete, e);
1186+
throw e;
11761187
}
1177-
result = engine.delete(delete);
1178-
} catch (Exception e) {
1179-
indexingOperationListeners.postDelete(shardId, delete, e);
1180-
throw e;
1188+
indexingOperationListeners.postDelete(shardId, delete, result);
1189+
return result;
1190+
} finally {
1191+
active.set(true);
11811192
}
1182-
indexingOperationListeners.postDelete(shardId, delete, result);
1183-
return result;
11841193
}
11851194

11861195
public static Engine.Delete prepareDelete(
@@ -2228,6 +2237,7 @@ public void flushOnIdle(long inactiveTimeNS) {
22282237
@Override
22292238
public void onFailure(Exception e) {
22302239
if (state != IndexShardState.CLOSED) {
2240+
active.set(true);
22312241
logger.warn("failed to flush shard on inactive", e);
22322242
}
22332243
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6262
import org.elasticsearch.common.util.concurrent.AtomicArray;
6363
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
64+
import org.elasticsearch.common.util.concurrent.ReleasableLock;
6465
import org.elasticsearch.core.Assertions;
6566
import org.elasticsearch.core.CheckedFunction;
6667
import org.elasticsearch.core.Releasable;
@@ -156,6 +157,7 @@
156157
import java.util.concurrent.atomic.AtomicInteger;
157158
import java.util.concurrent.atomic.AtomicLong;
158159
import java.util.concurrent.atomic.AtomicReference;
160+
import java.util.concurrent.locks.ReentrantReadWriteLock;
159161
import java.util.function.BiConsumer;
160162
import java.util.function.Consumer;
161163
import java.util.function.Function;
@@ -4046,6 +4048,68 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
40464048
}
40474049
}
40484050

4051+
public void testFlushOnIdleAfterOp() throws Exception {
4052+
// Holding the write lock makes the index/delete op to halt before being processed by the engine
4053+
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
4054+
final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
4055+
final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
4056+
IndexShard shard = newStartedShard(true, Settings.EMPTY, new IndexingOperationListener() {
4057+
@Override
4058+
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
4059+
try (ReleasableLock lock = readLock.acquire()) {
4060+
return operation;
4061+
}
4062+
}
4063+
4064+
@Override
4065+
public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) {
4066+
try (ReleasableLock lock = readLock.acquire()) {
4067+
return delete;
4068+
}
4069+
}
4070+
});
4071+
4072+
indexDoc(shard, "_doc", "0");
4073+
indexDoc(shard, "_doc", "1");
4074+
4075+
// Do a flush on idle
4076+
long flushesBefore = shard.flushStats().getPeriodic();
4077+
shard.flushOnIdle(0);
4078+
assertBusy(() -> assertThat(shard.flushStats().getPeriodic(), equalTo(flushesBefore + 1)));
4079+
assertFalse(shard.isActive());
4080+
4081+
// Index or delete a doc and halt it before being processed by the engine
4082+
boolean indexElseDelete = randomBoolean();
4083+
Thread t = new Thread(() -> {
4084+
try {
4085+
if (indexElseDelete) {
4086+
indexDoc(shard, "_doc", "2");
4087+
} else {
4088+
deleteDoc(shard, "0");
4089+
}
4090+
} catch (IOException e) {
4091+
throw new AssertionError("failed while processing op [" + e.getMessage() + "]");
4092+
}
4093+
});
4094+
try (ReleasableLock lock = writeLock.acquire()) {
4095+
t.start();
4096+
assertBusy(() -> assertThat(rwl.getQueueLength(), equalTo(1)));
4097+
assertFalse(shard.isActive());
4098+
} // Allow op to complete
4099+
4100+
t.join();
4101+
4102+
assertTrue(shard.isActive()); // should become active after the op has completed
4103+
4104+
// Do a flush on idle
4105+
shard.flushOnIdle(0);
4106+
assertBusy(() -> assertThat(shard.flushStats().getPeriodic(), equalTo(flushesBefore + 2)));
4107+
assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0));
4108+
assertFalse(shard.isActive());
4109+
4110+
closeShards(shard);
4111+
}
4112+
40494113
public void testOnCloseStats() throws IOException {
40504114
final IndexShard indexShard = newStartedShard(true);
40514115

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,14 @@ protected IndexShard newShard(final boolean primary, final Settings settings) th
217217
* another shard)
218218
* @param settings the settings to use for this shard
219219
* @param engineFactory the engine factory to use for this shard
220+
* @param listeners the indexing operation listeners to add
220221
*/
221-
protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException {
222+
protected IndexShard newShard(
223+
boolean primary,
224+
Settings settings,
225+
EngineFactory engineFactory,
226+
final IndexingOperationListener... listeners
227+
) throws IOException {
222228
final RecoverySource recoverySource = primary
223229
? RecoverySource.EmptyStoreRecoverySource.INSTANCE
224230
: RecoverySource.PeerRecoverySource.INSTANCE;
@@ -229,7 +235,7 @@ protected IndexShard newShard(boolean primary, Settings settings, EngineFactory
229235
ShardRoutingState.INITIALIZING,
230236
recoverySource
231237
);
232-
return newShard(shardRouting, settings, engineFactory);
238+
return newShard(shardRouting, settings, engineFactory, listeners);
233239
}
234240

235241
protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException {
@@ -606,11 +612,13 @@ protected IndexShard newStartedShard(final boolean primary) throws IOException {
606612
/**
607613
* Creates a new empty shard and starts it.
608614
*
609-
* @param primary controls whether the shard will be a primary or a replica.
610-
* @param settings the settings to use for this shard
615+
* @param primary controls whether the shard will be a primary or a replica.
616+
* @param settings the settings to use for this shard
617+
* @param listeners the indexing operation listeners to add
611618
*/
612-
protected IndexShard newStartedShard(final boolean primary, Settings settings) throws IOException {
613-
return newStartedShard(primary, settings, new InternalEngineFactory());
619+
protected IndexShard newStartedShard(final boolean primary, Settings settings, final IndexingOperationListener... listeners)
620+
throws IOException {
621+
return newStartedShard(primary, settings, new InternalEngineFactory(), listeners);
614622
}
615623

616624
/**
@@ -619,10 +627,15 @@ protected IndexShard newStartedShard(final boolean primary, Settings settings) t
619627
* @param primary controls whether the shard will be a primary or a replica.
620628
* @param settings the settings to use for this shard
621629
* @param engineFactory the engine factory to use for this shard
630+
* @param listeners the indexing operation listeners to add
622631
*/
623-
protected IndexShard newStartedShard(final boolean primary, final Settings settings, final EngineFactory engineFactory)
624-
throws IOException {
625-
return newStartedShard(p -> newShard(p, settings, engineFactory), primary);
632+
protected IndexShard newStartedShard(
633+
final boolean primary,
634+
final Settings settings,
635+
final EngineFactory engineFactory,
636+
final IndexingOperationListener... listeners
637+
) throws IOException {
638+
return newStartedShard(p -> newShard(p, settings, engineFactory, listeners), primary);
626639
}
627640

628641
/**

0 commit comments

Comments
 (0)