Skip to content

Commit c62eed0

Browse files
committed
Support for publisher confirms in quorum queues
[#154472198]
1 parent 45a7bac commit c62eed0

File tree

3 files changed

+35
-9
lines changed

3 files changed

+35
-9
lines changed

src/rabbit_amqqueue.erl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1304,7 +1304,8 @@ deliver([], _Delivery) ->
13041304
%% /dev/null optimisation
13051305
[];
13061306

1307-
deliver(Qs, Delivery = #delivery{flow = Flow}) ->
1307+
deliver(Qs, Delivery = #delivery{flow = Flow,
1308+
confirm = Confirm}) ->
13081309
{Quorum, MPids, SPids} = qpids(Qs),
13091310
QPids = MPids ++ SPids,
13101311
%% We use up two credits to send to a slave since the message
@@ -1331,7 +1332,13 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
13311332
SMsg = {deliver, Delivery, true},
13321333
delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
13331334
delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
1334-
[ra:send(Q, {enqueue, Delivery#delivery.message}) || Q <- Quorum],
1335+
case Confirm of
1336+
false ->
1337+
[ra:send(Q, {enqueue, Delivery#delivery.message}) || Q <- Quorum];
1338+
true ->
1339+
[ra:send_and_notify(Q, {enqueue, Delivery#delivery.message},
1340+
Delivery#delivery.msg_seq_no) || Q <- Quorum]
1341+
end,
13351342
QPids ++ Quorum.
13361343

13371344
qpids([]) -> {[], [], []}; %% optimisation

src/rabbit_channel.erl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -596,10 +596,8 @@ handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
596596
%% NB: don't call noreply/1 since we don't want to send confirms.
597597
noreply_coalesce(record_rejects(MXs, State#ch{unconfirmed = UC1}));
598598

599-
handle_cast({confirm, MsgSeqNos, QPid}, State = #ch{unconfirmed = UC}) ->
600-
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
601-
%% NB: don't call noreply/1 since we don't want to send confirms.
602-
noreply_coalesce(record_confirms(MXs, State#ch{unconfirmed = UC1})).
599+
handle_cast({confirm, MsgSeqNos, QPid}, State) ->
600+
noreply_coalesce(confirm(MsgSeqNos, QPid, State)).
603601

604602
handle_info({ra_fifo, {RegisteredName, _} = Id, {delivery, CTag, MsgId, Msg}},
605603
#ch{consumer_mapping = ConsumerMapping} = State) ->
@@ -618,6 +616,9 @@ handle_info({ra_fifo, {RegisteredName, _} = Id, {delivery, CTag, MsgId, Msg}},
618616
noreply(handle_deliver(CTag, AckRequired,
619617
{RegisteredName, Id, MsgId, true, Msg}, State));
620618

619+
handle_info({ra_event, {applied, Id, MsgSeqNo}} = Evt, State) ->
620+
noreply_coalesce(confirm([MsgSeqNo], Id, State));
621+
621622
handle_info({bump_credit, Msg}, State) ->
622623
%% A rabbit_amqqueue_process is granting credit to our channel. If
623624
%% our channel was being blocked by this process, and no other
@@ -1888,6 +1889,11 @@ process_routing_confirm(true, QPids, MsgSeqNo, XName, State) ->
18881889
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
18891890
State#ch.unconfirmed)}.
18901891

1892+
confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
1893+
{MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
1894+
%% NB: don't call noreply/1 since we don't want to send confirms.
1895+
record_confirms(MXs, State#ch{unconfirmed = UC1}).
1896+
18911897
send_nacks([], State) ->
18921898
State;
18931899
send_nacks(_MXs, State = #ch{state = closing,

test/quorum_queue_SUITE.erl

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ groups() ->
4646
consume_and_ack,
4747
subscribe_and_ack,
4848
consume_and_single_nack,
49-
subscribe_and_single_nack
49+
subscribe_and_single_nack,
50+
publisher_confirms
5051
]}
5152
].
5253

@@ -242,8 +243,6 @@ stop_start_rabbit_app(Config) ->
242243
amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}).
243244

244245
publish(Config) ->
245-
%% Test the node restart with both types of queues (quorum and classic) to
246-
%% ensure there are no regressions
247246
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
248247

249248
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
@@ -430,6 +429,20 @@ subscribe_and_single_nack(Config) ->
430429
requeue = true}),
431430
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>).
432431

432+
publisher_confirms(Config) ->
433+
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
434+
435+
Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
436+
QQ = <<"quorum-q">>,
437+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
438+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
439+
440+
amqp_channel:call(Ch, #'confirm.select'{}),
441+
amqp_channel:register_confirm_handler(Ch, self()),
442+
publish(Ch, QQ),
443+
wait_for_messages(Config, QQ, <<"1">>, <<"1">>, <<"0">>),
444+
amqp_channel:wait_for_confirms(Ch).
445+
433446
%%----------------------------------------------------------------------------
434447

435448
declare(Ch, Q) ->

0 commit comments

Comments
 (0)