3737import io .awspring .cloud .sqs .listener .acknowledgement .AcknowledgementResultCallback ;
3838import io .awspring .cloud .sqs .listener .acknowledgement .handler .AcknowledgementHandler ;
3939import io .awspring .cloud .sqs .listener .acknowledgement .handler .OnSuccessAcknowledgementHandler ;
40+ import io .awspring .cloud .sqs .operations .SqsTemplate ;
4041import java .time .Duration ;
4142import java .util .ArrayList ;
4243import java .util .Collection ;
4849import java .util .concurrent .CompletableFuture ;
4950import java .util .concurrent .ConcurrentHashMap ;
5051import java .util .concurrent .CountDownLatch ;
51- import java .util .concurrent .ExecutionException ;
5252import java .util .concurrent .TimeUnit ;
5353import java .util .concurrent .atomic .AtomicBoolean ;
5454import java .util .concurrent .atomic .AtomicInteger ;
5959import org .slf4j .LoggerFactory ;
6060import org .springframework .beans .factory .SmartInitializingSingleton ;
6161import org .springframework .beans .factory .annotation .Autowired ;
62- import org .springframework .beans .factory .annotation .Qualifier ;
6362import org .springframework .boot .test .context .SpringBootTest ;
6463import org .springframework .context .annotation .Bean ;
6564import org .springframework .context .annotation .Configuration ;
6665import org .springframework .context .annotation .Import ;
6766import org .springframework .messaging .Message ;
6867import org .springframework .messaging .handler .annotation .Header ;
68+ import org .springframework .messaging .support .MessageBuilder ;
6969import org .springframework .util .Assert ;
7070import org .springframework .util .StopWatch ;
7171import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
7272import software .amazon .awssdk .services .sqs .model .QueueAttributeName ;
73- import software .amazon .awssdk .services .sqs .model .SendMessageBatchRequestEntry ;
74- import software .amazon .awssdk .services .sqs .model .SendMessageBatchResponse ;
7573
7674/**
7775 * Integration tests for handling SQS FIFO queues.
7876 *
7977 * @author Tomaz Fernandes
78+ * @author Mikhail Strokov
8079 */
8180@ SpringBootTest
8281class SqsFifoIntegrationTests extends BaseSqsIntegrationTest {
@@ -101,16 +100,13 @@ class SqsFifoIntegrationTests extends BaseSqsIntegrationTest {
101100
102101 static final String FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME = "fifo_manually_create_batch_factory_test_queue.fifo" ;
103102
104- private static final String TEST_SQS_ASYNC_CLIENT_BEAN_NAME = "testSqsAsyncClient" ;
105-
106103 private static final String ERROR_ON_ACK_FACTORY = "errorOnAckFactory" ;
107104
108105 @ Autowired
109106 LatchContainer latchContainer ;
110107
111108 @ Autowired
112- @ Qualifier (TEST_SQS_ASYNC_CLIENT_BEAN_NAME )
113- SqsAsyncClient sqsAsyncClient ;
109+ SqsTemplate sqsTemplate ;
114110
115111 @ Autowired
116112 ObjectMapper objectMapper ;
@@ -176,6 +172,7 @@ public void afterSingletonsInstantiated() {
176172 loadSimulator .setBound (1000 );
177173 loadSimulator .setRandom (true );
178174 }
175+
179176 }
180177
181178 @ Test
@@ -184,15 +181,14 @@ void receivesMessagesInOrder() throws Exception {
184181 String messageGroupId = UUID .randomUUID ().toString ();
185182 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
186183 .collect (toList ());
187- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_MESSAGES_IN_ORDER_QUEUE_NAME );
188- sendMessageTo ( queueUrl , values , messageGroupId );
184+ sqsTemplate . sendMany (FIFO_RECEIVES_MESSAGES_IN_ORDER_QUEUE_NAME ,
185+ createMessagesFromValues ( messageGroupId , values ) );
189186 assertThat (latchContainer .receivesMessageLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS )).isTrue ();
190187 assertThat (receivesMessageInOrderListener .receivedMessages ).containsExactlyElementsOf (values );
191188 }
192189
193190 @ Test
194191 void receivesMessagesInOrderFromManyMessageGroups () throws Exception {
195- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME );
196192 int messagesPerTest = Math .max (this .settings .messagesPerTest , 30 );
197193 int numberOfMessageGroups = messagesPerTest / Math .max (this .settings .messagesPerMessageGroup , 10 );
198194 int messagesPerMessageGroup = Math .max (messagesPerTest / numberOfMessageGroups , 1 );
@@ -206,7 +202,21 @@ void receivesMessagesInOrderFromManyMessageGroups() throws Exception {
206202 LoadSimulator loadSimulator = new LoadSimulator ().setLoadEnabled (true ).setRandom (true ).setBound (20 );
207203 IntStream .range (0 , messageGroups .size ()).forEach (index -> {
208204 if (this .settings .sendMessages ) {
209- sendMessageTo (queueUrl , values , messageGroups .get (index ));
205+ try {
206+ if (useLocalStackClient ) {
207+ sqsTemplate .sendMany (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME ,
208+ createMessagesFromValues (messageGroups .get (index ), values ));
209+ }
210+ else {
211+ sqsTemplate .sendManyAsync (FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME ,
212+ createMessagesFromValues (messageGroups .get (index ), values ));
213+ }
214+ }
215+ catch (Exception e ) {
216+ logger .error ("Error sending messages to queue {}" ,
217+ FIFO_RECEIVES_MESSAGE_IN_ORDER_MANY_GROUPS_QUEUE_NAME , e );
218+ throw (RuntimeException ) e ;
219+ }
210220 }
211221 if (index % 10 == 0 ) {
212222 loadSimulator .runLoad ();
@@ -233,8 +243,8 @@ void stopsProcessingAfterException() throws Exception {
233243 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
234244 .collect (toList ());
235245 String messageGroupId = UUID .randomUUID ().toString ();
236- String queueUrl = fetchQueueUrl (FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME );
237- sendMessageTo ( queueUrl , values , messageGroupId );
246+ sqsTemplate . sendMany (FIFO_STOPS_PROCESSING_ON_ERROR_QUEUE_NAME ,
247+ createMessagesFromValues ( messageGroupId , values ) );
238248 assertThat (latchContainer .stopsProcessingOnErrorLatch1 .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
239249 .isTrue ();
240250 logger .debug ("receivedMessagesBeforeException: {}" , stopsOnErrorListener .receivedMessagesBeforeException );
@@ -263,8 +273,8 @@ void stopsProcessingAfterAckException() throws Exception {
263273 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
264274 .collect (toList ());
265275 String messageGroupId = UUID .randomUUID ().toString ();
266- String queueUrl = fetchQueueUrl (FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME );
267- sendMessageTo ( queueUrl , values , messageGroupId );
276+ sqsTemplate . sendMany (FIFO_STOPS_PROCESSING_ON_ACK_ERROR_ERROR_QUEUE_NAME ,
277+ createMessagesFromValues ( messageGroupId , values ) );
268278 assertThat (latchContainer .stopsProcessingOnAckErrorLatch1 .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
269279 .isTrue ();
270280 logger .debug ("Messages consumed before error: {}" , messagesContainer .stopsProcessingOnAckErrorBeforeThrown );
@@ -289,10 +299,12 @@ void receivesBatchesManyGroups() throws Exception {
289299 String messageGroupId1 = UUID .randomUUID ().toString ();
290300 String messageGroupId2 = UUID .randomUUID ().toString ();
291301 String messageGroupId3 = UUID .randomUUID ().toString ();
292- String queueUrl = fetchQueueUrl (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME );
293- sendMessageTo (queueUrl , values , messageGroupId1 );
294- sendMessageTo (queueUrl , values , messageGroupId2 );
295- sendMessageTo (queueUrl , values , messageGroupId3 );
302+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
303+ createMessagesFromValues (messageGroupId1 , values ));
304+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
305+ createMessagesFromValues (messageGroupId2 , values ));
306+ sqsTemplate .sendMany (FIFO_RECEIVES_BATCHES_MANY_GROUPS_QUEUE_NAME ,
307+ createMessagesFromValues (messageGroupId3 , values ));
296308 assertThat (latchContainer .receivesBatchManyGroupsLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
297309 .isTrue ();
298310 assertThat (receivesBatchesFromManyGroupsListener .receivedMessages .get (messageGroupId1 ))
@@ -305,21 +317,21 @@ void receivesBatchesManyGroups() throws Exception {
305317
306318 @ Test
307319 void manuallyCreatesContainer () throws Exception {
308- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME );
309320 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
310321 .collect (toList ());
311- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
322+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_CONTAINER_QUEUE_NAME ,
323+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
312324 assertThat (latchContainer .manuallyCreatedContainerLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
313325 .isTrue ();
314326 assertThat (messagesContainer .manuallyCreatedContainerMessages ).containsExactlyElementsOf (values );
315327 }
316328
317329 @ Test
318330 void manuallyCreatesBatchContainer () throws Exception {
319- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME );
320331 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
321332 .collect (toList ());
322- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
333+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_BATCH_CONTAINER_QUEUE_NAME ,
334+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
323335 assertThat (
324336 latchContainer .manuallyCreatedBatchContainerLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
325337 .isTrue ();
@@ -328,27 +340,39 @@ void manuallyCreatesBatchContainer() throws Exception {
328340
329341 @ Test
330342 void manuallyCreatesFactory () throws Exception {
331- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME );
332343 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
333344 .collect (toList ());
334- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
345+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_FACTORY_QUEUE_NAME ,
346+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
335347 assertThat (latchContainer .manuallyCreatedFactoryLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
336348 .isTrue ();
337349 assertThat (messagesContainer .manuallyCreatedFactoryMessages ).containsExactlyElementsOf (values );
338350 }
339351
340352 @ Test
341353 void manuallyCreatesBatchFactory () throws Exception {
342- String queueUrl = fetchQueueUrl (FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME );
343354 List <String > values = IntStream .range (0 , this .settings .messagesPerTest ).mapToObj (String ::valueOf )
344355 .collect (toList ());
345- sendMessageTo (queueUrl , values , UUID .randomUUID ().toString ());
356+ sqsTemplate .sendMany (FIFO_MANUALLY_CREATE_BATCH_FACTORY_QUEUE_NAME ,
357+ createMessagesFromValues (UUID .randomUUID ().toString (), values ));
346358 assertThat (
347359 latchContainer .manuallyCreatedBatchFactoryLatch .await (settings .latchTimeoutSeconds , TimeUnit .SECONDS ))
348360 .isTrue ();
349361 assertThat (messagesContainer .manuallyCreatedBatchFactoryMessages ).containsExactlyElementsOf (values );
350362 }
351363
364+ private Message <String > createMessage (String body , String messageGroupId ) {
365+ return MessageBuilder .withPayload (body )
366+ .setHeader (SqsHeaders .MessageSystemAttributes .SQS_MESSAGE_GROUP_ID_HEADER , messageGroupId )
367+ .setHeader (SqsHeaders .MessageSystemAttributes .SQS_MESSAGE_DEDUPLICATION_ID_HEADER ,
368+ UUID .randomUUID ().toString ())
369+ .build ();
370+ }
371+
372+ private List <Message <String >> createMessagesFromValues (String messageGroupId , List <String > values ) {
373+ return values .stream ().map (value -> createMessage (value , messageGroupId )).toList ();
374+ }
375+
352376 static class ReceivesMessageInOrderListener {
353377
354378 List <String > receivedMessages = Collections .synchronizedList (new ArrayList <>());
@@ -466,57 +490,7 @@ void listen(List<Message<String>> messages) {
466490 messages .forEach (msg -> latchContainer .receivesBatchManyGroupsLatch .countDown ());
467491 logger .trace ("Finished processing messages {} for group id {}" , values , messageGroupId );
468492 }
469- }
470493
471- private void sendMessageTo (String queueUrl , List <String > messageBodies , String messageGroupId ) {
472- try {
473- if (useLocalStackClient ) {
474- sendManyTo (queueUrl , messageBodies , messageGroupId ).join ();
475- }
476- else {
477- sendManyTo (queueUrl , messageBodies , messageGroupId );
478- }
479- }
480- catch (Exception e ) {
481- logger .error ("Error sending messages to queue {}" , queueUrl , e );
482- throw (RuntimeException ) e ;
483- }
484- }
485-
486- private CompletableFuture <Void > sendManyTo (String queueUrl , List <String > messageBodies , String messageGroupId ) {
487- return IntStream .range (0 , (int ) Math .ceil (messageBodies .size () / 10. ))
488- .mapToObj (index -> messageBodies .subList (index * 10 , Math .min ((index + 1 ) * 10 , messageBodies .size ())))
489- .reduce (CompletableFuture .completedFuture (null ), (previousFuture , messages ) -> previousFuture
490- .thenCompose (theVoid -> doSendMessageTo (queueUrl , messages , messageGroupId ).thenRun (() -> {
491- })), (a , b ) -> a );
492- }
493-
494- AtomicInteger messagesSent = new AtomicInteger ();
495-
496- private CompletableFuture <SendMessageBatchResponse > doSendMessageTo (String queueUrl , List <String > messageBodies ,
497- String messageGroupId ) {
498- return sqsAsyncClient .sendMessageBatch (req -> req
499- .entries (messageBodies .stream ().map (body -> createEntry (body , messageGroupId )).collect (toList ()))
500- .queueUrl (queueUrl ).build ()).whenComplete ((v , t ) -> {
501- if (t != null ) {
502- logger .error ("Error sending messages" , t );
503- }
504- else {
505- int sent = messagesSent .addAndGet (messageBodies .size ());
506- if (sent % 1000 == 0 ) {
507- logger .debug ("Sent {} messages to queue {}" , sent , queueUrl );
508- }
509- }
510- });
511- }
512-
513- private SendMessageBatchRequestEntry createEntry (String body , String messageGroupId ) {
514- return SendMessageBatchRequestEntry .builder ().messageBody (body ).id (UUID .randomUUID ().toString ())
515- .messageGroupId (messageGroupId ).messageDeduplicationId (UUID .randomUUID ().toString ()).build ();
516- }
517-
518- private String fetchQueueUrl (String receivesMessageQueueName ) throws InterruptedException , ExecutionException {
519- return this .sqsAsyncClient .getQueueUrl (req -> req .queueName (receivesMessageQueueName )).get ().queueUrl ();
520494 }
521495
522496 static class LatchContainer {
@@ -799,9 +773,10 @@ ObjectMapper objectMapper() {
799773 return new ObjectMapper ();
800774 }
801775
802- @ Bean (name = TEST_SQS_ASYNC_CLIENT_BEAN_NAME )
803- SqsAsyncClient sqsAsyncClientProducer () {
804- return BaseSqsIntegrationTest .createHighThroughputAsyncClient ();
776+ @ Bean
777+ SqsTemplate sqsTemplate () {
778+ return SqsTemplate .builder ().sqsAsyncClient (BaseSqsIntegrationTest .createHighThroughputAsyncClient ())
779+ .build ();
805780 }
806781
807782 }
0 commit comments