@@ -26,6 +26,7 @@ public class BrokerController
2626 private readonly IMessageStore _messageStore ;
2727 private readonly IConsumeOffsetStore _consumeOffsetStore ;
2828 private readonly SuspendedPullRequestManager _suspendedPullRequestManager ;
29+ private readonly ProducerManager _producerManager ;
2930 private readonly ConsumerManager _consumerManager ;
3031 private readonly SocketRemotingServer _producerSocketRemotingServer ;
3132 private readonly SocketRemotingServer _consumerSocketRemotingServer ;
@@ -36,6 +37,10 @@ public class BrokerController
3637 private int _isCleaning = 0 ;
3738
3839 public BrokerSetting Setting { get ; private set ; }
40+ public ProducerManager ProducerManager
41+ {
42+ get { return _producerManager ; }
43+ }
3944 public ConsumerManager ConsumerManager
4045 {
4146 get { return _consumerManager ; }
@@ -52,6 +57,7 @@ public static BrokerController Instance
5257 private BrokerController ( BrokerSetting setting )
5358 {
5459 Setting = setting ?? new BrokerSetting ( ) ;
60+ _producerManager = ObjectContainer . Resolve < ProducerManager > ( ) ;
5561 _consumerManager = ObjectContainer . Resolve < ConsumerManager > ( ) ;
5662 _messageStore = ObjectContainer . Resolve < IMessageStore > ( ) ;
5763 _consumeOffsetStore = ObjectContainer . Resolve < IConsumeOffsetStore > ( ) ;
@@ -64,6 +70,7 @@ private BrokerController(BrokerSetting setting)
6470 _adminSocketRemotingServer = new SocketRemotingServer ( "EQueue.Broker.AdminRemotingServer" , Setting . AdminAddress , Setting . SocketSetting ) ;
6571
6672 _logger = ObjectContainer . Resolve < ILoggerFactory > ( ) . Create ( GetType ( ) . FullName ) ;
73+ _producerSocketRemotingServer . RegisterConnectionEventListener ( new ProducerConnectionEventListener ( this ) ) ;
6774 _consumerSocketRemotingServer . RegisterConnectionEventListener ( new ConsumerConnectionEventListener ( this ) ) ;
6875 RegisterRequestHandlers ( ) ;
6976
@@ -150,6 +157,7 @@ public BrokerController Start()
150157 _consumeOffsetStore . Start ( ) ;
151158 _messageStore . Start ( ) ;
152159 _queueStore . Start ( ) ;
160+ _producerManager . Start ( ) ;
153161 _consumerManager . Start ( ) ;
154162 _suspendedPullRequestManager . Start ( ) ;
155163 _consumerSocketRemotingServer . Start ( ) ;
@@ -172,6 +180,7 @@ public BrokerController Shutdown()
172180 _producerSocketRemotingServer . Shutdown ( ) ;
173181 _consumerSocketRemotingServer . Shutdown ( ) ;
174182 _adminSocketRemotingServer . Shutdown ( ) ;
183+ _producerManager . Shutdown ( ) ;
175184 _consumerManager . Shutdown ( ) ;
176185 _suspendedPullRequestManager . Shutdown ( ) ;
177186 _messageStore . Shutdown ( ) ;
@@ -189,6 +198,7 @@ public BrokerStatisticInfo GetBrokerStatisticInfo()
189198 statisticInfo . QueueCount = _queueStore . GetAllQueueCount ( ) ;
190199 statisticInfo . TotalUnConsumedMessageCount = _queueStore . GetTotalUnConusmedMessageCount ( ) ;
191200 statisticInfo . ConsumerGroupCount = _consumerManager . GetConsumerGroupCount ( ) ;
201+ statisticInfo . ProducerCount = _producerManager . GetProducerCount ( ) ;
192202 statisticInfo . ConsumerCount = _consumerManager . GetConsumerCount ( ) ;
193203 statisticInfo . MessageChunkCount = _messageStore . ChunkCount ;
194204 statisticInfo . MessageMinChunkNum = _messageStore . MinChunkNum ;
@@ -209,20 +219,22 @@ private void RemoveNotExistQueueConsumeOffsets()
209219 }
210220 private void RegisterRequestHandlers ( )
211221 {
222+ _producerSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . ProducerHeartbeat , new ProducerHeartbeatRequestHandler ( this ) ) ;
212223 _producerSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . SendMessage , new SendMessageRequestHandler ( this ) ) ;
213224
225+ _consumerSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . ConsumerHeartbeat , new ConsumerHeartbeatRequestHandler ( this ) ) ;
214226 _consumerSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . PullMessage , new PullMessageRequestHandler ( ) ) ;
215227
216228 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . GetTopicQueueIdsForProducer , new GetTopicQueueIdsForProducerRequestHandler ( ) ) ;
217229 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . GetTopicQueueIdsForConsumer , new GetTopicQueueIdsForConsumerRequestHandler ( ) ) ;
218230 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . QueryGroupConsumer , new QueryConsumerRequestHandler ( ) ) ;
219- _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . ConsumerHeartbeat , new ConsumerHeartbeatRequestHandler ( this ) ) ;
220231 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . UpdateQueueOffsetRequest , new UpdateQueueOffsetRequestHandler ( ) ) ;
221232
222233 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . QueryBrokerStatisticInfo , new QueryBrokerStatisticInfoRequestHandler ( ) ) ;
223234 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . CreateTopic , new CreateTopicRequestHandler ( ) ) ;
224235 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . DeleteTopic , new DeleteTopicRequestHandler ( ) ) ;
225236 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . QueryTopicQueueInfo , new QueryTopicQueueInfoRequestHandler ( ) ) ;
237+ _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . QueryProducerInfo , new QueryProducerInfoRequestHandler ( ) ) ;
226238 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . QueryConsumerInfo , new QueryConsumerInfoRequestHandler ( ) ) ;
227239 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . AddQueue , new AddQueueRequestHandler ( ) ) ;
228240 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . DeleteQueue , new DeleteQueueRequestHandler ( ) ) ;
@@ -231,6 +243,24 @@ private void RegisterRequestHandlers()
231243 _adminSocketRemotingServer . RegisterRequestHandler ( ( int ) RequestCode . GetMessageDetail , new GetMessageDetailRequestHandler ( ) ) ;
232244 }
233245
246+ class ProducerConnectionEventListener : IConnectionEventListener
247+ {
248+ private BrokerController _brokerController ;
249+
250+ public ProducerConnectionEventListener ( BrokerController brokerController )
251+ {
252+ _brokerController = brokerController ;
253+ }
254+
255+ public void OnConnectionAccepted ( ITcpConnection connection ) { }
256+ public void OnConnectionEstablished ( ITcpConnection connection ) { }
257+ public void OnConnectionFailed ( SocketError socketError ) { }
258+ public void OnConnectionClosed ( ITcpConnection connection , SocketError socketError )
259+ {
260+ var producerId = ClientIdFactory . CreateClientId ( connection . RemotingEndPoint as IPEndPoint ) ;
261+ _brokerController . _producerManager . RemoveProducer ( producerId ) ;
262+ }
263+ }
234264 class ConsumerConnectionEventListener : IConnectionEventListener
235265 {
236266 private BrokerController _brokerController ;
0 commit comments