Skip to content

Commit 1f7a8c4

Browse files
authored
[fix][test]fix flaky test TxnLogBufferedWriterTest.testMainProcess (apache#18290)
1 parent 2de90b2 commit 1f7a8c4

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Random;
3636
import java.util.concurrent.ArrayBlockingQueue;
37+
import java.util.concurrent.ConcurrentHashMap;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.ExecutorService;
3940
import java.util.concurrent.ScheduledExecutorService;
@@ -193,7 +194,7 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize,
193194
batchedWriteMaxDelayInMillis, batchEnabled, DISABLED_BUFFERED_WRITER_METRICS);
194195
// Store the param-context, param-position, param-exception of callback function and complete-count for verify.
195196
List<Integer> contextArrayOfCallback = Collections.synchronizedList(new ArrayList<>());
196-
List<ManagedLedgerException> exceptionArrayOfCallback = Collections.synchronizedList(new ArrayList<>());
197+
Map<Integer, ManagedLedgerException> exceptionArrayOfCallback = new ConcurrentHashMap<>();
197198
Map<PositionImpl, List<Position>> positionsOfCallback = Collections.synchronizedMap(new LinkedHashMap<>());
198199
AtomicBoolean anyFlushCompleted = new AtomicBoolean();
199200
TxnLogBufferedWriter.AddDataCallback callback = new TxnLogBufferedWriter.AddDataCallback(){
@@ -215,7 +216,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
215216
return;
216217
}
217218
contextArrayOfCallback.add((int)ctx);
218-
exceptionArrayOfCallback.add(exception);
219+
exceptionArrayOfCallback.put((int)ctx, exception);
219220
}
220221
};
221222
// Write many times.
@@ -252,8 +253,14 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
252253
Collections.sort(contextArrayOfCallback);
253254
}
254255
assertEquals(contextArrayOfCallback.size(), writeCmdExecuteCount);
255-
for (int ctxIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
256-
assertEquals(contextArrayOfCallback.get(ctxIndex).intValue(), ctxIndex);
256+
for (int ctxIndex = 0, successIndex = 0; ctxIndex < writeCmdExecuteCount; ctxIndex++){
257+
// When calling `txnLogBufferedWriter.close`, all tasks in the queue will fail immediately, this makes the
258+
// callback of failure task earlier.
259+
if (exceptionArrayOfCallback.containsKey(ctxIndex)){
260+
continue;
261+
}
262+
assertEquals(contextArrayOfCallback.get(successIndex).intValue(), ctxIndex);
263+
successIndex++;
257264
}
258265
// if {@param bookieError} is true. verify the ex count.
259266
// if {@param bookieError} is false. verify the param-position count.

0 commit comments

Comments
 (0)