@@ -66,6 +66,7 @@ struct _zctx_t {
6666 int pipehwm ; // Send/receive HWM for pipes
6767 int sndhwm ; // ZMQ_SNDHWM for normal sockets
6868 int rcvhwm ; // ZMQ_RCVHWM for normal sockets
69+ zmutex_t * socketsMutex ; // Synchronizes access to socket zlist
6970};
7071
7172
@@ -105,6 +106,7 @@ zctx_new (void)
105106 self -> sndhwm = 1000 ;
106107 self -> rcvhwm = 1000 ;
107108 self -> main = true;
109+ self -> socketsMutex = zmutex_new ();
108110 zsys_handler_set (s_signal_handler );
109111 return self ;
110112}
@@ -119,11 +121,14 @@ zctx_destroy (zctx_t **self_p)
119121 assert (self_p );
120122 if (* self_p ) {
121123 zctx_t * self = * self_p ;
124+ zmutex_lock (self -> socketsMutex );
122125 while (zlist_size (self -> sockets ))
123126 zctx__socket_destroy (self , zlist_first (self -> sockets ));
124127 zlist_destroy (& self -> sockets );
128+ zmutex_unlock (self -> socketsMutex );
125129 if (self -> main && self -> context )
126130 zmq_term (self -> context );
131+ zmutex_destroy (& self -> socketsMutex );
127132 free (self );
128133 * self_p = NULL ;
129134 }
@@ -263,10 +268,13 @@ zctx__socket_new (zctx_t *self, int type)
263268 zsocket_set_sndhwm (zocket , self -> sndhwm );
264269 zsocket_set_rcvhwm (zocket , self -> rcvhwm );
265270#endif
271+ zmutex_lock (self -> socketsMutex );
266272 if (zlist_push (self -> sockets , zocket )) {
267273 zmq_close (zocket );
274+ zmutex_unlock (self -> socketsMutex );
268275 return NULL ;
269276 }
277+ zmutex_unlock (self -> socketsMutex );
270278 return zocket ;
271279}
272280
@@ -296,7 +304,9 @@ zctx__socket_destroy (zctx_t *self, void *zocket)
296304 zsocket_set_linger (zocket , self -> linger );
297305 zmq_close (zocket );
298306 }
307+ zmutex_lock (self -> socketsMutex );
299308 zlist_remove (self -> sockets , zocket );
309+ zmutex_unlock (self -> socketsMutex );
300310}
301311
302312
0 commit comments