Skip to content

Commit 8c59478

Browse files
committed
Ack messages consumed from quorum queues
[#154472221]
1 parent c272d87 commit 8c59478

File tree

3 files changed

+62
-20
lines changed

3 files changed

+62
-20
lines changed

src/rabbit_amqqueue.erl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -992,8 +992,18 @@ purge(#amqqueue{ pid = QPid }) ->
992992
requeue(QPid, MsgIds, ChPid) ->
993993
delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}).
994994

995-
ack(QPid, MsgIds, ChPid) ->
996-
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}).
995+
ack(QPid, MsgIds, ChPid) when is_pid(QPid) ->
996+
delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]});
997+
ack(Id, CTagsMsgIds, ChPid) ->
998+
%% TODO Basic get doesn't have a consumer tag, we have to fake it again
999+
[{ok, _, _} = ra:send_and_await_consensus(Id, {settle, MsgId, {quorum_ctag(CTag), ChPid}})
1000+
|| {CTag, MsgId} <- CTagsMsgIds],
1001+
ok.
1002+
1003+
quorum_ctag(none) ->
1004+
<<"ctag">>;
1005+
quorum_ctag(Other) ->
1006+
Other.
9971007

9981008
reject(QPid, Requeue, MsgIds, ChPid) ->
9991009
delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}).

src/rabbit_channel.erl

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ handle_info({ra_fifo, {RegisteredName, _} = Id, {delivery, CTag, MsgId, Msg}},
616616
ok
617617
end,
618618
noreply(handle_deliver(CTag, AckRequired,
619-
{Id, whereis(RegisteredName), MsgId, true, Msg}, State));
619+
{RegisteredName, Id, MsgId, true, Msg}, State));
620620

621621
handle_info({bump_credit, Msg}, State) ->
622622
%% A rabbit_amqqueue_process is granting credit to our channel. If
@@ -1782,13 +1782,17 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
17821782

17831783
foreach_per_queue(_F, []) ->
17841784
ok;
1785-
foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
1785+
foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) when is_pid(QPid) -> %% common case
17861786
F(QPid, [MsgId]);
1787+
foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% quorum queue, needs the consumer tag
1788+
F(QPid, [{CTag, MsgId}]);
17871789
%% NB: UAL should be in youngest-first order; the tree values will
17881790
%% then be in oldest-first order
17891791
foreach_per_queue(F, UAL) ->
1790-
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
1791-
rabbit_misc:gb_trees_cons(QPid, MsgId, T)
1792+
T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) when is_pid(QPid) ->
1793+
rabbit_misc:gb_trees_cons(QPid, MsgId, T);
1794+
({_DTag, CTag, {QPid, MsgId}}, T) ->
1795+
rabbit_misc:gb_trees_cons(QPid, {CTag, MsgId}, T)
17921796
end, gb_trees:empty(), UAL),
17931797
rabbit_misc:gb_trees_foreach(F, T).
17941798

test/quorum_queue_SUITE.erl

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@ groups() ->
4242
consume_from_empty_queue,
4343
consume_and_autoack_from_queue,
4444
subscribe_to_queue,
45-
subscribe_with_autoack_to_queue
45+
subscribe_with_autoack_to_queue,
46+
consume_and_ack_from_queue,
47+
subscribe_and_ack_to_queue
4648
]}
4749
].
4850

@@ -270,8 +272,6 @@ publish_and_restart(Config) ->
270272
wait_for_messages(Config, QQ, <<"2">>, <<"2">>, <<"0">>).
271273

272274
consume_from_queue(Config) ->
273-
%% Test the node restart with both types of queues (quorum and classic) to
274-
%% ensure there are no regressions
275275
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
276276

277277
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -287,8 +287,6 @@ consume_from_queue(Config) ->
287287
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>).
288288

289289
consume_and_autoack_from_queue(Config) ->
290-
%% Test the node restart with both types of queues (quorum and classic) to
291-
%% ensure there are no regressions
292290
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
293291

294292
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -304,8 +302,6 @@ consume_and_autoack_from_queue(Config) ->
304302
wait_for_messages(Config, QQ, <<"0">>, <<"0">>, <<"0">>).
305303

306304
consume_from_empty_queue(Config) ->
307-
%% Test the node restart with both types of queues (quorum and classic) to
308-
%% ensure there are no regressions
309305
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
310306

311307
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -316,8 +312,6 @@ consume_from_empty_queue(Config) ->
316312
consume_empty(Ch, QQ, false).
317313

318314
subscribe_to_queue(Config) ->
319-
%% Test the node restart with both types of queues (quorum and classic) to
320-
%% ensure there are no regressions
321315
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
322316

323317
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -337,8 +331,6 @@ subscribe_to_queue(Config) ->
337331
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>).
338332

339333
subscribe_with_autoack_to_queue(Config) ->
340-
%% Test the node restart with both types of queues (quorum and classic) to
341-
%% ensure there are no regressions
342334
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
343335

344336
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -362,6 +354,41 @@ subscribe_with_autoack_to_queue(Config) ->
362354
rabbit_ct_client_helpers:close_channel(Ch),
363355
wait_for_messages(Config, QQ, <<"0">>, <<"0">>, <<"0">>).
364356

357+
consume_and_ack_from_queue(Config) ->
358+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
359+
360+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
361+
QQ = <<"quorum-q">>,
362+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
363+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
364+
365+
publish(Ch, QQ),
366+
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>),
367+
%% TODO we don't store consumer tag for basic.get!!! could it be fixed?
368+
DeliveryTag = consume(Ch, QQ, false),
369+
wait_for_messages(Config, QQ, <<"1">>, <<"0">>, <<"1">>),
370+
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
371+
wait_for_messages(Config, QQ, <<"0">>, <<"0">>, <<"0">>).
372+
373+
subscribe_and_ack_to_queue(Config) ->
374+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
375+
376+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
377+
QQ = <<"quorum-q">>,
378+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
379+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
380+
381+
publish(Ch, QQ),
382+
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>),
383+
subscribe(Ch, QQ, false),
384+
receive
385+
{#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
386+
ok
387+
end,
388+
wait_for_messages(Config, QQ, <<"1">>, <<"0">>, <<"1">>),
389+
amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
390+
wait_for_messages(Config, QQ, <<"0">>, <<"0">>, <<"0">>).
391+
365392
%%----------------------------------------------------------------------------
366393

367394
declare(Ch, Q) ->
@@ -408,9 +435,10 @@ publish(Ch, Queue) ->
408435
payload = <<"msg">>}).
409436

410437
consume(Ch, Queue, NoAck) ->
411-
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}},
412-
amqp_channel:call(Ch, #'basic.get'{queue = Queue,
413-
no_ack = NoAck})).
438+
{GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
439+
no_ack = NoAck}),
440+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, Reply),
441+
GetOk#'basic.get_ok'.delivery_tag.
414442

415443
consume_empty(Ch, Queue, NoAck) ->
416444
?assertMatch(#'basic.get_empty'{},

0 commit comments

Comments
 (0)