@@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
192
192
{error , Error } -> {stop , Error , NotStarted }
193
193
end ;
194
194
195
- handle_call ({deliver , Delivery , true }, From , State ) ->
196
- % % Synchronous, "mandatory" deliver mode.
197
- gen_server2 :reply (From , ok ),
198
- noreply (maybe_enqueue_message (Delivery , State ));
199
-
200
195
handle_call ({gm_deaths , LiveGMPids }, From ,
201
196
State = # state { q = Q = # amqqueue { name = QName , pid = MPid }}) ->
202
197
Self = self (),
@@ -464,16 +459,25 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
464
459
backing_queue_state = BQS }) ->
465
460
State # state { backing_queue_state = BQ :invoke (Mod , Fun , BQS ) }.
466
461
467
- send_or_record_confirm (_ , # delivery { msg_seq_no = undefined }, MS , _State ) ->
462
+ send_mandatory (# delivery {mandatory = false }) ->
463
+ ok ;
464
+ send_mandatory (# delivery {mandatory = true ,
465
+ sender = SenderPid ,
466
+ msg_seq_no = MsgSeqNo }) ->
467
+ gen_server2 :cast (SenderPid , {mandatory_received , MsgSeqNo }).
468
+
469
+ send_or_record_confirm (_ , # delivery { confirm = false }, MS , _State ) ->
468
470
MS ;
469
471
send_or_record_confirm (published , # delivery { sender = ChPid ,
472
+ confirm = true ,
470
473
msg_seq_no = MsgSeqNo ,
471
474
message = # basic_message {
472
475
id = MsgId ,
473
476
is_persistent = true } },
474
477
MS , # state { q = # amqqueue { durable = true } }) ->
475
478
dict :store (MsgId , {published , ChPid , MsgSeqNo } , MS );
476
479
send_or_record_confirm (_Status , # delivery { sender = ChPid ,
480
+ confirm = true ,
477
481
msg_seq_no = MsgSeqNo },
478
482
MS , _State ) ->
479
483
ok = rabbit_misc :confirm_to_sender (ChPid , [MsgSeqNo ]),
@@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
609
613
(_Msgid , _Status , MTC0 ) ->
610
614
MTC0
611
615
end , gb_trees :empty (), MS ),
612
- Deliveries = [Delivery ||
616
+ Deliveries = [Delivery # delivery { mandatory = false } || % % [0]
613
617
{_ChPid , {PubQ , _PendCh , _ChState }} <- dict :to_list (SQ ),
614
618
Delivery <- queue :to_list (PubQ )],
615
619
AwaitGmDown = [ChPid || {ChPid , {_ , _ , down_from_ch }} <- dict :to_list (SQ )],
@@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
621
625
Q1 , rabbit_mirror_queue_master , MasterState , RateTRef , Deliveries , KS1 ,
622
626
MTC ).
623
627
628
+ % % [0] We reset mandatory to false here because we will have sent the
629
+ % % mandatory_received already as soon as we got the message
630
+
624
631
noreply (State ) ->
625
632
{NewState , Timeout } = next_state (State ),
626
633
{noreply , ensure_rate_timer (NewState ), Timeout }.
@@ -736,6 +743,7 @@ maybe_enqueue_message(
736
743
Delivery = # delivery { message = # basic_message { id = MsgId },
737
744
sender = ChPid },
738
745
State = # state { sender_queues = SQ , msg_id_status = MS }) ->
746
+ send_mandatory (Delivery ), % % must do this before confirms
739
747
State1 = ensure_monitoring (ChPid , State ),
740
748
% % We will never see {published, ChPid, MsgSeqNo} here.
741
749
case dict :find (MsgId , MS ) of
0 commit comments