Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,18 +500,18 @@ protected void onShardResult(Result result) {
if (logger.isTraceEnabled()) {
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
}
results.consumeResult(result, () -> onShardResultConsumed(result));
}

private void onShardResultConsumed(Result result) {
successfulOps.incrementAndGet();
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
// in the #addShardFailure, because by definition, it will happen on *another* shardIndex
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
if (shardFailures != null) {
shardFailures.set(result.getShardIndex(), null);
}
results.consumeResult(result, this::onShardResultConsumed);
}

private void onShardResultConsumed() {
successfulOps.incrementAndGet();
// we need to increment successful ops first before we compare the exit condition otherwise if we
// are fast we could concurrently update totalOps but then preempt one of the threads which can
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
Expand Down