diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 8351e2bcf7f42..ca07e3ac180c6 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -500,11 +500,6 @@ protected void onShardResult(Result result) { if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } - results.consumeResult(result, () -> onShardResultConsumed(result)); - } - - private void onShardResultConsumed(Result result) { - successfulOps.incrementAndGet(); // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level // so its ok concurrency wise to miss potentially the shard failures being created because of another failure // in the #addShardFailure, because by definition, it will happen on *another* shardIndex @@ -512,6 +507,11 @@ private void onShardResultConsumed(Result result) { if (shardFailures != null) { shardFailures.set(result.getShardIndex(), null); } + results.consumeResult(result, this::onShardResultConsumed); + } + + private void onShardResultConsumed() { + successfulOps.incrementAndGet(); // we need to increment successful ops first before we compare the exit condition otherwise if we // are fast we could concurrently update totalOps but then preempt one of the threads which can // cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.