@@ -35,7 +35,7 @@ public class StompClient {
3535
3636 private Disposable mMessagesDisposable ;
3737 private Disposable mLifecycleDisposable ;
38- private Map <String , Set <FlowableEmitter <? super StompMessage >>> mEmitters = new ConcurrentHashMap <>();
38+ private Map <String , Set <FlowableEmitter <? super StompMessage >>> mEmitters = Collections . synchronizedMap ( new HashMap <>() );
3939 private List <ConnectableFlowable <Void >> mWaitConnectionFlowables ;
4040 private final ConnectionProvider mConnectionProvider ;
4141 private HashMap <String , String > mTopics ;
@@ -141,12 +141,14 @@ public Flowable<Void> send(StompMessage stompMessage) {
141141
142142 private void callSubscribers (StompMessage stompMessage ) {
143143 String messageDestination = stompMessage .findHeader (StompHeader .DESTINATION );
144- for (String dest : mEmitters .keySet ()) {
145- if (dest .equals (messageDestination )) {
146- for (FlowableEmitter <? super StompMessage > subscriber : mEmitters .get (dest )) {
147- subscriber .onNext (stompMessage );
144+ synchronized (mEmitters ) {
145+ for (String dest : mEmitters .keySet ()) {
146+ if (dest .equals (messageDestination )) {
147+ for (FlowableEmitter <? super StompMessage > subscriber : mEmitters .get (dest )) {
148+ subscriber .onNext (stompMessage );
149+ }
150+ return ;
148151 }
149- return ;
150152 }
151153 }
152154 }
@@ -167,27 +169,31 @@ public Flowable<StompMessage> topic(String destinationPath) {
167169
168170 public Flowable <StompMessage > topic (String destinationPath , List <StompHeader > headerList ) {
169171 return Flowable .<StompMessage >create (emitter -> {
170- Set <FlowableEmitter <? super StompMessage >> emittersSet = mEmitters .get (destinationPath );
171- if (emittersSet == null ) {
172- emittersSet = new HashSet <>();
173- mEmitters .put (destinationPath , emittersSet );
174- subscribePath (destinationPath , headerList ).subscribe ();
172+ synchronized (mEmitters ) {
173+ Set <FlowableEmitter <? super StompMessage >> emittersSet = mEmitters .get (destinationPath );
174+ if (emittersSet == null ) {
175+ emittersSet = new HashSet <>();
176+ mEmitters .put (destinationPath , emittersSet );
177+ subscribePath (destinationPath , headerList ).subscribe ();
178+ }
179+ emittersSet .add (emitter );
175180 }
176- emittersSet .add (emitter );
177181 }, BackpressureStrategy .BUFFER )
178182 .doOnCancel (() -> {
179- Iterator <String > mapIterator = mEmitters .keySet ().iterator ();
180- while (mapIterator .hasNext ()) {
181- String destinationUrl = mapIterator .next ();
182- Set <FlowableEmitter <? super StompMessage >> set = mEmitters .get (destinationUrl );
183- Iterator <FlowableEmitter <? super StompMessage >> setIterator = set .iterator ();
184- while (setIterator .hasNext ()) {
185- FlowableEmitter <? super StompMessage > subscriber = setIterator .next ();
186- if (subscriber .isCancelled ()) {
187- setIterator .remove ();
188- if (set .size () < 1 ) {
189- mapIterator .remove ();
190- unsubscribePath (destinationUrl ).subscribe ();
183+ synchronized (mEmitters ) {
184+ Iterator <String > mapIterator = mEmitters .keySet ().iterator ();
185+ while (mapIterator .hasNext ()) {
186+ String destinationUrl = mapIterator .next ();
187+ Set <FlowableEmitter <? super StompMessage >> set = mEmitters .get (destinationUrl );
188+ Iterator <FlowableEmitter <? super StompMessage >> setIterator = set .iterator ();
189+ while (setIterator .hasNext ()) {
190+ FlowableEmitter <? super StompMessage > subscriber = setIterator .next ();
191+ if (subscriber .isCancelled ()) {
192+ setIterator .remove ();
193+ if (set .size () < 1 ) {
194+ mapIterator .remove ();
195+ unsubscribePath (destinationUrl ).subscribe ();
196+ }
191197 }
192198 }
193199 }
0 commit comments