Skip to content

Commit 74346ac

Browse files
committed
Autoack messages from quorum queues on basic.get
[#154472211]
1 parent faec336 commit 74346ac

File tree

2 files changed

+31
-7
lines changed

2 files changed

+31
-7
lines changed

src/rabbit_amqqueue.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,6 +1039,12 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, ChPid, N
10391039
{ra_event, _, machine, {msg, _, empty}} ->
10401040
empty;
10411041
{ra_event, _, machine, {msg, MsgId, Msg}} ->
1042+
case NoAck of
1043+
true ->
1044+
ok;
1045+
false ->
1046+
{ok, _, _} = ra:send_and_await_consensus(Id, {settle, MsgId, self()})
1047+
end,
10421048
{ok, quorum_messages(Name), {QName, Id, MsgId, false, Msg}}
10431049
end.
10441050

test/quorum_queue_SUITE.erl

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ groups() ->
3939
publish_to_queue,
4040
publish_and_restart,
4141
consume_from_queue,
42-
consume_from_empty_queue
42+
consume_from_empty_queue,
43+
consume_and_autoack_from_queue
4344
]}
4445
].
4546

@@ -278,11 +279,28 @@ consume_from_queue(Config) ->
278279

279280
publish(Ch, QQ),
280281
wait_for_messages(Config, 0, QQ, <<"1">>, <<"1">>, <<"0">>),
281-
consume(Ch, QQ),
282+
consume(Ch, QQ, true),
282283
wait_for_messages(Config, 0, QQ, <<"1">>, <<"0">>, <<"1">>),
283284
rabbit_ct_client_helpers:close_channel(Ch),
284285
wait_for_messages(Config, 0, QQ, <<"1">>, <<"1">>, <<"0">>).
285286

287+
consume_and_autoack_from_queue(Config) ->
288+
%% Test the node restart with both types of queues (quorum and classic) to
289+
%% ensure there are no regressions
290+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
291+
292+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
293+
QQ = <<"quorum-q">>,
294+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
295+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
296+
297+
publish(Ch, QQ),
298+
wait_for_messages(Config, 0, QQ, <<"1">>, <<"1">>, <<"0">>),
299+
consume(Ch, QQ, false),
300+
wait_for_messages(Config, 0, QQ, <<"0">>, <<"0">>, <<"0">>),
301+
rabbit_ct_client_helpers:close_channel(Ch),
302+
wait_for_messages(Config, 0, QQ, <<"0">>, <<"0">>, <<"0">>).
303+
286304
consume_from_empty_queue(Config) ->
287305
%% Test the node restart with both types of queues (quorum and classic) to
288306
%% ensure there are no regressions
@@ -293,7 +311,7 @@ consume_from_empty_queue(Config) ->
293311
?assertEqual({'queue.declare_ok', QQ, 0, 0},
294312
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
295313

296-
consume_empty(Ch, QQ).
314+
consume_empty(Ch, QQ, true).
297315

298316
%%----------------------------------------------------------------------------
299317

@@ -340,12 +358,12 @@ publish(Ch, Queue) ->
340358
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
341359
payload = <<"msg">>}).
342360

343-
consume(Ch, Queue) ->
361+
consume(Ch, Queue, NoAck) ->
344362
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
345363
amqp_channel:call(Ch, #'basic.get'{queue = Queue,
346-
no_ack = true})).
364+
no_ack = NoAck})).
347365

348-
consume_empty(Ch, Queue) ->
366+
consume_empty(Ch, Queue, NoAck) ->
349367
?assertMatch(#'basic.get_empty'{},
350368
amqp_channel:call(Ch, #'basic.get'{queue = Queue,
351-
no_ack = true})).
369+
no_ack = NoAck})).

0 commit comments

Comments
 (0)