Skip to content

Commit 0780df2

Browse files
committed
优化ConsumerPool模型
1 parent 031e63f commit 0780df2

File tree

10 files changed

+207
-112
lines changed

10 files changed

+207
-112
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
<groupId>skywalker</groupId>
66
<artifactId>Consumer</artifactId>
7-
<version>1.2-SNAPSHOT</version>
7+
<version>1.3-SNAPSHOT</version>
88
<build>
99
<plugins>
1010
<plugin>
@@ -43,7 +43,7 @@
4343
<dependency>
4444
<groupId>org.jctools</groupId>
4545
<artifactId>jctools-core</artifactId>
46-
<version>2.0</version>
46+
<version>2.0.1</version>
4747
</dependency>
4848

4949
<dependency>
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package consumer.cas;
2+
3+
import consumer.MultiThreadsConsumer;
4+
import consumer.queue.SQueue;
5+
import consumer.queue.cas.MpmcBasedQueue;
6+
import consumer.queue.cas.MpscBasedQueue;
7+
8+
/**
9+
* {@link MultiThreadsConsumer}实现,暂且只提供给予CAS的多线程消费者实现.
10+
*
11+
* @author skywalker
12+
*/
13+
public abstract class AbstractMPCASConsumer<T> extends AbstractSPCASConsumer<T> implements MultiThreadsConsumer<T> {
14+
15+
public AbstractMPCASConsumer(int queueSize, int threads) {
16+
super(queueSize, threads);
17+
}
18+
19+
@Override
20+
protected final SQueue<T> newQueue() {
21+
return (threads > 1 ? new MpmcBasedQueue<T>(queueSize) : new MpscBasedQueue<T>(queueSize));
22+
}
23+
24+
}

src/main/java/consumer/cas/AbstractMultiThreadsConsumer.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

src/main/java/consumer/cas/AbstractCASConsumer.java renamed to src/main/java/consumer/cas/AbstractSPCASConsumer.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,40 @@
44
import consumer.cas.strategy.BlockStrategy;
55
import consumer.cas.strategy.RetryStrategy;
66
import consumer.queue.SQueue;
7+
import consumer.queue.cas.SpmcBasedQueue;
78
import consumer.queue.cas.SpscBasedQueue;
89

910
import java.util.Objects;
11+
import java.util.concurrent.ExecutorService;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ThreadFactory;
1014

1115
/**
12-
* {@link AbstractQueuedConsumer}骨架实现,基于CAS操作的消费者实现.
16+
* {@link AbstractQueuedConsumer}骨架实现,基于无锁队列实现,子类可指定消费者线程数,
17+
* 如果有一个消费线程,那么使用{@link org.jctools.queues.SpscArrayQueue},否则使用
18+
* {@link org.jctools.queues.SpmcArrayQueue}.
1319
* <br>
1420
* 默认采用阻塞的等待策略.
1521
*
1622
* @author skywalker
1723
*/
18-
public abstract class AbstractCASConsumer<T> extends AbstractQueuedConsumer<T> {
24+
public abstract class AbstractSPCASConsumer<T> extends AbstractQueuedConsumer<T> {
1925

2026
private RetryStrategy<T> retryStrategy = new BlockStrategy<T>();
2127

22-
public AbstractCASConsumer(int queueSize) {
28+
/**
29+
* 消费线程的数量.
30+
*/
31+
protected final int threads;
32+
33+
public AbstractSPCASConsumer(int queueSize, int threads) {
2334
super(queueSize);
35+
this.threads = threads;
2436
}
2537

2638
@Override
2739
protected SQueue<T> newQueue() {
28-
return new SpscBasedQueue<T>(queueSize);
40+
return (threads > 1 ? new SpmcBasedQueue<T>(queueSize) : new SpscBasedQueue<T>(queueSize));
2941
}
3042

3143
@Override
@@ -59,4 +71,14 @@ public void setRetryStrategy(RetryStrategy<T> retryStrategy) {
5971
this.retryStrategy = retryStrategy;
6072
}
6173

74+
@Override
75+
protected final ExecutorService startExecutor(ThreadFactory threadFactory) {
76+
ExecutorService service = Executors.newFixedThreadPool(threads, threadFactory);
77+
int index = 0;
78+
while (index++ < threads) {
79+
service.execute(this);
80+
}
81+
return service;
82+
}
83+
6284
}

src/main/java/consumer/cas/strategy/RetryStrategy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package consumer.cas.strategy;
22

3+
import consumer.cas.AbstractSPCASConsumer;
34
import consumer.queue.SQueue;
45

56
/**
6-
* 当{@link consumer.cas.AbstractCASConsumer}获取任务失败时采取的重试策略.
7+
* 当{@link AbstractSPCASConsumer}获取任务失败时采取的重试策略.
78
*
89
* @author skywalker
910
*/

0 commit comments

Comments
 (0)