@@ -272,7 +272,8 @@ public void startProcessingSharedSubscriptions(ClientSessionCtx clientSessionCtx
272
272
applicationPubRelMsgCtx ,
273
273
clientSessionCtx ,
274
274
persistedMsgCtx ,
275
- publishProtoMessages );
275
+ publishProtoMessages ,
276
+ subscription );
276
277
submitStrategy .init (messagesToDeliver );
277
278
278
279
if (isDebugEnabled ) {
@@ -286,7 +287,7 @@ public void startProcessingSharedSubscriptions(ClientSessionCtx clientSessionCtx
286
287
int totalPubRelMsgs = ctx .getPubRelPendingMsgMap ().size ();
287
288
cachePackProcessingCtx (clientId , subscription , ctx );
288
289
289
- process (submitStrategy , clientSessionCtx , clientId , subscription );
290
+ process (submitStrategy , clientSessionCtx , clientId );
290
291
291
292
if (isJobActive (job )) {
292
293
ctx .await (packProcessingTimeout , TimeUnit .MILLISECONDS );
@@ -330,12 +331,14 @@ public void startProcessingSharedSubscriptions(ClientSessionCtx clientSessionCtx
330
331
private List <PersistedMsg > getMessagesToDeliver (ApplicationPubRelMsgCtx applicationPubRelMsgCtx ,
331
332
ClientSessionCtx clientSessionCtx ,
332
333
ApplicationPersistedMsgCtx persistedMsgCtx ,
333
- List <TbProtoQueueMsg <PublishMsgProto >> publishProtoMessages ) {
334
+ List <TbProtoQueueMsg <PublishMsgProto >> publishProtoMessages ,
335
+ TopicSharedSubscription subscription ) {
334
336
List <PersistedPubRelMsg > pubRelMessagesToDeliver = applicationPubRelMsgCtx .toSortedPubRelMessagesToDeliver ();
335
337
List <PersistedPublishMsg > publishMessagesToDeliver = toPublishMessagesToDeliver (
336
338
clientSessionCtx ,
337
339
persistedMsgCtx ,
338
- publishProtoMessages
340
+ publishProtoMessages ,
341
+ subscription
339
342
);
340
343
341
344
return collectMessagesToDeliver (pubRelMessagesToDeliver , publishMessagesToDeliver );
@@ -654,7 +657,8 @@ private void processPersistedMessages(TbQueueControlledOffsetConsumer<TbProtoQue
654
657
applicationPubRelMsgCtx ,
655
658
clientSessionCtx ,
656
659
persistedMsgCtx ,
657
- publishProtoMessages );
660
+ publishProtoMessages ,
661
+ null );
658
662
submitStrategy .init (messagesToDeliver );
659
663
660
664
applicationPubRelMsgCtx = new ApplicationPubRelMsgCtx (Sets .newConcurrentHashSet ());
@@ -664,7 +668,7 @@ private void processPersistedMessages(TbQueueControlledOffsetConsumer<TbProtoQue
664
668
int totalPubRelMsgs = ctx .getPubRelPendingMsgMap ().size ();
665
669
packProcessingCtxMap .put (clientId , ctx );
666
670
667
- process (submitStrategy , clientSessionCtx , clientId , null );
671
+ process (submitStrategy , clientSessionCtx , clientId );
668
672
669
673
if (isClientConnected (sessionId , clientState )) {
670
674
ctx .await (packProcessingTimeout , TimeUnit .MILLISECONDS );
@@ -695,17 +699,15 @@ private void processPersistedMessages(TbQueueControlledOffsetConsumer<TbProtoQue
695
699
}
696
700
}
697
701
698
- private void process (ApplicationSubmitStrategy submitStrategy , ClientSessionCtx clientSessionCtx ,
699
- String clientId , TopicSharedSubscription topicSharedSubscription ) {
702
+ private void process (ApplicationSubmitStrategy submitStrategy , ClientSessionCtx clientSessionCtx , String clientId ) {
700
703
if (isDebugEnabled ) {
701
704
log .debug ("[{}] Start sending the pack of messages from processing ctx: {}" , clientId , submitStrategy .getOrderedMessages ());
702
705
}
703
706
submitStrategy .process (msg -> {
704
707
switch (msg .getPacketType ()) {
705
708
case PUBLISH :
706
709
PublishMsg publishMsg = ((PersistedPublishMsg ) msg ).getPublishMsg ();
707
- int minQoSValue = getMinQoSValue (topicSharedSubscription , publishMsg );
708
- publishMsgDeliveryService .sendPublishMsgToClientWithoutFlush (clientSessionCtx , publishMsg .toBuilder ().qosLevel (minQoSValue ).build ());
710
+ publishMsgDeliveryService .sendPublishMsgToClientWithoutFlush (clientSessionCtx , publishMsg );
709
711
break ;
710
712
case PUBREL :
711
713
publishMsgDeliveryService .sendPubRelMsgToClientWithoutFlush (clientSessionCtx , msg .getPacketId ());
@@ -716,9 +718,8 @@ private void process(ApplicationSubmitStrategy submitStrategy, ClientSessionCtx
716
718
clientSessionCtx .getChannel ().flush ();
717
719
}
718
720
719
- private int getMinQoSValue (TopicSharedSubscription subscription , PublishMsg publishMsg ) {
720
- return subscription == null ? publishMsg .getQosLevel () :
721
- Math .min (subscription .getQos (), publishMsg .getQosLevel ());
721
+ private int getMinQoSValue (TopicSharedSubscription subscription , int publishMsgQos ) {
722
+ return subscription == null ? publishMsgQos : Math .min (subscription .getQos (), publishMsgQos );
722
723
}
723
724
724
725
private List <PersistedMsg > collectMessagesToDeliver (List <PersistedPubRelMsg > pubRelMessagesToDeliver ,
@@ -743,23 +744,22 @@ private ApplicationPubRelMsgCtx persistedMsgCtxToPubRelMsgCtx(ApplicationPersist
743
744
744
745
private List <PersistedPublishMsg > toPublishMessagesToDeliver (ClientSessionCtx clientSessionCtx ,
745
746
ApplicationPersistedMsgCtx persistedMsgCtx ,
746
- List <TbProtoQueueMsg <PublishMsgProto >> publishProtoMessages ) {
747
+ List <TbProtoQueueMsg <PublishMsgProto >> publishProtoMessages ,
748
+ TopicSharedSubscription subscription ) {
747
749
List <PersistedPublishMsg > result = new ArrayList <>(publishProtoMessages .size ());
748
750
for (TbProtoQueueMsg <PublishMsgProto > msg : publishProtoMessages ) {
749
751
var msgPacketId = persistedMsgCtx .getMsgPacketId (msg .getOffset ());
750
752
int packetId = msgPacketId != null ? msgPacketId : clientSessionCtx .getMsgIdSeq ().nextMsgId ();
751
753
boolean isDup = msgPacketId != null ;
752
- PublishMsg publishMsg = toPubMsg (msg .getValue (), packetId , isDup );
754
+ int minQoSValue = getMinQoSValue (subscription , msg .getValue ().getQos ());
755
+ PublishMsg publishMsg = toPubMsg (msg .getValue (), packetId , minQoSValue , isDup );
753
756
result .add (new PersistedPublishMsg (publishMsg , msg .getOffset ()));
754
757
}
755
758
return result ;
756
759
}
757
760
758
- private PublishMsg toPubMsg (PublishMsgProto persistedMsgProto , int packetId , boolean isDup ) {
759
- return ProtoConverter .convertToPublishMsg (persistedMsgProto ).toBuilder ()
760
- .packetId (packetId )
761
- .isDup (isDup )
762
- .build ();
761
+ private PublishMsg toPubMsg (PublishMsgProto persistedMsgProto , int packetId , int qos , boolean isDup ) {
762
+ return ProtoConverter .convertToPublishMsg (persistedMsgProto , packetId , qos , isDup );
763
763
}
764
764
765
765
private boolean isClientConnected (UUID sessionId , ClientActorStateInfo clientState ) {
0 commit comments