11package consumer .pool ;
22
33import consumer .Consumer ;
4- import consumer .MultiThreadsConsumer ;
54import consumer .cas .AbstractSPCASConsumer ;
65import consumer .cas .AbstractMPCASConsumer ;
76import consumer .cas .strategy .RetryStrategy ;
87import consumer .lifecycle .LifeCycle ;
98import consumer .lifecycle .StateCheckDelegate ;
9+ import consumer .pool .dispatch .DispatchStrategy ;
10+ import consumer .pool .dispatch .RoundRobinStrategy ;
11+ import consumer .pool .dispatch .SingleProducerStrategy ;
12+ import consumer .pool .internal .InternalMPConsumer ;
13+ import consumer .pool .internal .InternalSPConsumer ;
1014import org .slf4j .Logger ;
1115import consumer .util .Util ;
1216
2428public class DefaultConsumerPool <T > implements ConsumerPool <T > {
2529
2630 /**
27- * 每个单生产者队列消费线程数量 .
31+ * 每个消费者的消费线程数 .
2832 */
29- private final int spThreads ;
30- /**
31- * 单生产者队列大小.
32- */
33- private final int spQueueSize ;
34- /**
35- * 多生产者消费线程数.
36- */
37- private final int mpThreads ;
38- /**
39- * 多生产者队列大小.
40- */
41- private final int mpQueueSize ;
33+ private final int threads ;
34+ private final int queueSize ;
4235 private final ConsumeActionFactory <T > factory ;
43- /**
44- * 消费者列表,采取如下的存储策略:
45- * <p>最后一个为多生产者模型,前面的为但生产者模型.</p>
46- */
47- private final List <ConsumerWrapper > list ;
36+ private final boolean isSingleProducer ;
4837 private final StateCheckDelegate delegate = StateCheckDelegate .getInstance ();
49- private final Object monitor = new Object () ;
38+ private final List < Consumer < T >> list ;
5039 /**
5140 * 消费者总数.
5241 */
@@ -56,6 +45,7 @@ public class DefaultConsumerPool<T> implements ConsumerPool<T> {
5645 private RetryStrategy <T > retryStrategy ;
5746 private ThreadNameGenerator threadNameGenerator ;
5847 private Thread .UncaughtExceptionHandler handler ;
48+ private DispatchStrategy <T > dispatchStrategy ;
5949
6050 private volatile State state = State .INIT ;
6151
@@ -64,36 +54,47 @@ public class DefaultConsumerPool<T> implements ConsumerPool<T> {
6454 */
6555 private static final String defaultTerminateThreadName = "DefaultConsumerPool-terminate-thread" ;
6656
67- public DefaultConsumerPool (int sp , int spThreads , int mpThreads , int spQueueSize , int mpQueueSize , ConsumeActionFactory <T > factory ) {
57+ public DefaultConsumerPool (boolean isSingleProducer , int number , int consumerThreads , int queueSize , ConsumeActionFactory <T > factory ) {
6858 Objects .requireNonNull (factory );
69- this .mpThreads = mpThreads ;
70- this .spThreads = spThreads ;
7159 this .factory = factory ;
72- this .spQueueSize = spQueueSize ;
73- this .mpQueueSize = mpQueueSize ;
74- this .consumers = sp + 1 ;
75- this .list = new ArrayList <>(this .consumers );
60+ this .consumers = number ;
61+ this .queueSize = queueSize ;
62+ this .threads = consumerThreads ;
63+ this .isSingleProducer = isSingleProducer ;
64+ this .list = new ArrayList <>(consumers );
7665 }
7766
7867 @ Override
7968 public boolean start () {
8069 delegate .checkStart (this );
81- for (int i = 0 ; i < consumers - 1 ; i ++) {
82- InternalSPConsumer spConsumer = new InternalSPConsumer (spQueueSize , spThreads );
83- prepareConsumer (spConsumer );
84- if (!spConsumer .start ()) {
70+ if (isSingleProducer ) {
71+ //单生产者模式,我们需要number - 1个InternalSPConsumer和1个InternalMPConsumer
72+ for (int i = 0 ; i < consumers - 1 ; i ++) {
73+ InternalSPConsumer <T > spConsumer = new InternalSPConsumer <>(queueSize , threads , factory , threadNameGenerator );
74+ prepareConsumer (spConsumer );
75+ if (!spConsumer .start ()) {
76+ return false ;
77+ }
78+ list .add (spConsumer );
79+ }
80+ InternalMPConsumer <T > mpConsumer = new InternalMPConsumer <>(queueSize , threads , factory , threadNameGenerator );
81+ prepareConsumer (mpConsumer );
82+ if (!mpConsumer .start ()) {
8583 return false ;
8684 }
87- ConsumerWrapper wrapper = new ConsumerWrapper (spConsumer );
88- spConsumer .wrapper = wrapper ;
89- list .add (wrapper );
90- }
91- AbstractMPCASConsumer <T > mpConsumer = new InternalMPConsumer (mpQueueSize , mpThreads );
92- prepareConsumer (mpConsumer );
93- if (!mpConsumer .start ()) {
94- return false ;
85+ list .add (mpConsumer );
86+ dispatchStrategy = new SingleProducerStrategy <>(list );
87+ } else {
88+ for (int i = 0 ; i < consumers ; i ++) {
89+ AbstractMPCASConsumer <T > mpConsumer = new InternalMPConsumer <>(queueSize , threads , factory , threadNameGenerator );
90+ prepareConsumer (mpConsumer );
91+ if (!mpConsumer .start ()) {
92+ return false ;
93+ }
94+ list .add (mpConsumer );
95+ }
96+ dispatchStrategy = new RoundRobinStrategy <>(list );
9597 }
96- list .add (new ConsumerWrapper (mpConsumer ));
9798 this .state = State .RUNNING ;
9899 return true ;
99100 }
@@ -132,7 +133,7 @@ private Future<Void> terminateHelper(Function<Consumer<T>, Future<Void>> functio
132133 delegate .checkTerminated (this );
133134 final Future [] futures = new Future [this .consumers ];
134135 for (int i = 0 ; i < this .consumers ; i ++) {
135- futures [i ] = function .apply (list .get (i ). consumer );
136+ futures [i ] = function .apply (list .get (i ));
136137 }
137138 final CompletableFuture <Void > future = new CompletableFuture <>();
138139 new Thread (() -> {
@@ -164,48 +165,12 @@ public State getState() {
164165 @ Override
165166 public Consumer <T > acquire () {
166167 delegate .checkRunning (this );
167- Consumer <T > result = acquireSPSC ();
168- if (result == null ) {
169- if (log != null && log .isDebugEnabled ()) {
170- log .debug ("Currently there are no sp consumer available, so use mp instead." );
171- }
172- result = acquireMPMC ();
173- }
174- return result ;
175- }
176-
177- @ Override
178- public Consumer <T > acquireSPSC () {
179- delegate .checkRunning (this );
180- Consumer <T > result = null ;
181- synchronized (monitor ) {
182- for (int i = 0 ; i < consumers - 1 ; i ++) {
183- ConsumerWrapper wrapper = list .get (i );
184- if (wrapper .available ) {
185- result = wrapper .consumer ;
186- wrapper .available = false ;
187- break ;
188- }
189- }
190- }
191- return result ;
192- }
193-
194- @ Override
195- public MultiThreadsConsumer <T > acquireMPMC () {
196- delegate .checkRunning (this );
197- return (MultiThreadsConsumer <T >) list .get (consumers - 1 ).consumer ;
168+ return dispatchStrategy .acquire ();
198169 }
199170
200171 @ Override
201172 public void release (Consumer <T > consumer ) {
202- if (!(consumer instanceof MultiThreadsConsumer )) {
203- //无需释放
204- synchronized (monitor ) {
205- InternalSPConsumer internal = (InternalSPConsumer ) consumer ;
206- internal .wrapper .available = true ;
207- }
208- }
173+ dispatchStrategy .release (consumer );
209174 }
210175
211176 @ SuppressWarnings ("unused" )
@@ -249,68 +214,4 @@ public ThreadNameGenerator getThreadNameGenerator() {
249214 return threadNameGenerator ;
250215 }
251216
252- /**
253- * {@link AbstractSPCASConsumer}实现,将其consume方法委托给{@link ConsumeAction#consume(Object)}.
254- */
255- private class InternalSPConsumer extends AbstractSPCASConsumer <T > {
256-
257- private ConsumerWrapper wrapper ;
258-
259- private final ConsumeAction <T > action ;
260-
261- InternalSPConsumer (int queueSize , int threads ) {
262- super (queueSize , threads );
263- this .action = factory .newAction ();
264- }
265-
266- @ Override
267- public void consume (T task ) {
268- action .consume (task );
269- }
270-
271- @ Override
272- protected String getThreadName (Thread t ) {
273- return (threadNameGenerator == null ? super .getThreadName (t ) : threadNameGenerator .generate (t ));
274- }
275-
276- }
277-
278- /**
279- * {@link AbstractMPCASConsumer}实现,将其consume方法委托给{@link ConsumeAction#consume(Object)}.
280- */
281- private class InternalMPConsumer extends AbstractMPCASConsumer <T > {
282-
283- private final ConsumeAction <T > action ;
284-
285- InternalMPConsumer (int queueSize , int threads ) {
286- super (queueSize , threads );
287- this .action = factory .newAction ();
288- }
289-
290- @ Override
291- public void consume (T task ) {
292- action .consume (task );
293- }
294-
295- @ Override
296- protected String getThreadName (Thread t ) {
297- return (threadNameGenerator == null ? super .getThreadName (t ) : threadNameGenerator .generate (t ));
298- }
299-
300- }
301-
302- /**
303- * 对{@link Consumer}进行包装,增加是否可用的标志位.
304- */
305- private class ConsumerWrapper {
306-
307- private boolean available = true ;
308- private final Consumer <T > consumer ;
309-
310- ConsumerWrapper (Consumer <T > consumer ) {
311- this .consumer = consumer ;
312- }
313-
314- }
315-
316217}
0 commit comments