1717
1818import static java .util .Collections .singletonMap ;
1919import static org .assertj .core .api .Assertions .assertThat ;
20+ import static org .junit .jupiter .api .Assertions .assertDoesNotThrow ;
2021
2122import com .fasterxml .jackson .databind .ObjectMapper ;
2223import io .awspring .cloud .sqs .CompletableFutures ;
5354import java .util .Collections ;
5455import java .util .List ;
5556import java .util .UUID ;
57+ import java .util .concurrent .BrokenBarrierException ;
5658import java .util .concurrent .CompletableFuture ;
5759import java .util .concurrent .CountDownLatch ;
60+ import java .util .concurrent .CyclicBarrier ;
5861import java .util .concurrent .TimeUnit ;
5962import java .util .concurrent .atomic .AtomicBoolean ;
6063import java .util .stream .Collectors ;
@@ -116,6 +119,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {
116119
117120 static final String MANUALLY_CREATE_FACTORY_QUEUE_NAME = "manually_create_factory_test_queue" ;
118121
122+ static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue" ;
123+
119124 static final String LOW_RESOURCE_FACTORY = "lowResourceFactory" ;
120125
121126 static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory" ;
@@ -139,7 +144,8 @@ static void beforeTests() {
139144 createQueue (client , RESOLVES_PARAMETER_TYPES_QUEUE_NAME ,
140145 singletonMap (QueueAttributeName .VISIBILITY_TIMEOUT , "20" )),
141146 createQueue (client , MANUALLY_CREATE_CONTAINER_QUEUE_NAME ),
142- createQueue (client , MANUALLY_CREATE_FACTORY_QUEUE_NAME )).join ();
147+ createQueue (client , MANUALLY_CREATE_FACTORY_QUEUE_NAME ),
148+ createQueue (client , MAX_CONCURRENT_MESSAGES_QUEUE_NAME )).join ();
143149 }
144150
145151 @ Autowired
@@ -275,6 +281,20 @@ void manuallyCreatesFactory() throws Exception {
275281 assertThat (latchContainer .manuallyCreatedFactorySinkLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
276282 }
277283
284+ @ Test
285+ void maxConcurrentMessages () {
286+ List <Message <String >> messages1 = IntStream .range (0 , 10 )
287+ .mapToObj (index -> "maxConcurrentMessages-payload-" + index )
288+ .map (payload -> MessageBuilder .withPayload (payload ).build ()).collect (Collectors .toList ());
289+ List <Message <String >> messages2 = IntStream .range (10 , 20 )
290+ .mapToObj (index -> "maxConcurrentMessages-payload-" + index )
291+ .map (payload -> MessageBuilder .withPayload (payload ).build ()).collect (Collectors .toList ());
292+ sqsTemplate .sendManyAsync (MAX_CONCURRENT_MESSAGES_QUEUE_NAME , messages1 );
293+ sqsTemplate .sendManyAsync (MAX_CONCURRENT_MESSAGES_QUEUE_NAME , messages2 );
294+ logger .debug ("Sent messages to queue {} with messages {} and {}" , MAX_CONCURRENT_MESSAGES_QUEUE_NAME , messages1 , messages2 );
295+ assertDoesNotThrow (() -> latchContainer .maxConcurrentMessagesBarrier .await (10 , TimeUnit .SECONDS ));
296+ }
297+
278298 static class ReceivesMessageListener {
279299
280300 @ Autowired
@@ -399,6 +419,18 @@ void listen(Message<String> message, MessageHeaders headers, Acknowledgement ack
399419 }
400420 }
401421
422+ static class MaxConcurrentMessagesListener {
423+
424+ @ Autowired
425+ LatchContainer latchContainer ;
426+
427+ @ SqsListener (queueNames = MAX_CONCURRENT_MESSAGES_QUEUE_NAME , maxMessagesPerPoll = "10" , maxConcurrentMessages = "20" , id = "max-concurrent-messages" )
428+ void listen (String message ) throws BrokenBarrierException , InterruptedException {
429+ logger .debug ("Received message in Listener Method: " + message );
430+ latchContainer .maxConcurrentMessagesBarrier .await ();
431+ }
432+ }
433+
402434 static class LatchContainer {
403435
404436 final CountDownLatch receivesMessageLatch = new CountDownLatch (1 );
@@ -421,6 +453,7 @@ static class LatchContainer {
421453 final CountDownLatch acknowledgementCallbackSuccessLatch = new CountDownLatch (1 );
422454 final CountDownLatch acknowledgementCallbackBatchLatch = new CountDownLatch (1 );
423455 final CountDownLatch acknowledgementCallbackErrorLatch = new CountDownLatch (1 );
456+ final CyclicBarrier maxConcurrentMessagesBarrier = new CyclicBarrier (21 );
424457
425458 }
426459
@@ -612,6 +645,11 @@ ResolvesParameterTypesListener resolvesParameterTypesListener() {
612645 return new ResolvesParameterTypesListener ();
613646 }
614647
648+ @ Bean
649+ MaxConcurrentMessagesListener maxConcurrentMessagesListener () {
650+ return new MaxConcurrentMessagesListener ();
651+ }
652+
615653 @ Bean
616654 SqsListenerConfigurer customizer () {
617655 return registrar -> {
0 commit comments