Skip to content

Commit c5bb4f2

Browse files
committed
添加了redis消息队列
1 parent a4e9fc7 commit c5bb4f2

File tree

14 files changed

+249
-44
lines changed

14 files changed

+249
-44
lines changed

cockroach-annotation/cockroach-annotation.iml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3-
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
3+
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
44
<output url="file://$MODULE_DIR$/target/classes" />
55
<output-test url="file://$MODULE_DIR$/target/test-classes" />
66
<content url="file://$MODULE_DIR$">

cockroach-core/cockroach-core.iml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3-
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
3+
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
44
<output url="file://$MODULE_DIR$/target/classes" />
55
<output-test url="file://$MODULE_DIR$/target/test-classes" />
66
<content url="file://$MODULE_DIR$">

cockroach-core/src/main/java/com/zhangyingwei/cockroach/executer/task/Task.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public class Task implements Comparable<Task> {
2020
//每一个任务都会有一个分组,如果没有设置,默认为 default
2121
private String group = Constants.APP_TASK_GROUP_DEFAULT;
2222
private String url;
23-
private Map<String, Object> params;
23+
private Map<String, Object> params = new HashMap<String, Object>();;
2424
private List<String> selects;
2525
private Object extr;
2626
private Integer retry = Constants.DEFAULT_TASK_RETRY;
@@ -46,22 +46,23 @@ public Task(String url, String group, Map<String, Object> params) {
4646
this.params = params;
4747
}
4848

49+
public Task() {
50+
}
51+
4952
public String getGroup() {
5053
return group;
5154
}
5255

53-
public Task setGroup(String group) {
56+
public void setGroup(String group) {
5457
this.group = group;
55-
return this;
5658
}
5759

5860
public String getId() {
5961
return id;
6062
}
6163

62-
public Task setId(String id) {
64+
public void setId(String id) {
6365
this.id = id;
64-
return this;
6566
}
6667

6768
public String getUrl() {
@@ -79,9 +80,6 @@ public void setUrl(String url) {
7980
}
8081

8182
public Map<String, Object> getParams() {
82-
if (this.params == null) {
83-
this.params = new HashMap<String, Object>();
84-
}
8583
return params;
8684
}
8785

@@ -103,9 +101,8 @@ public Object getExtr() {
103101
return extr;
104102
}
105103

106-
public Task setExtr(Object extr) {
104+
public void setExtr(Object extr) {
107105
this.extr = extr;
108-
return this;
109106
}
110107

111108
@Override
@@ -186,4 +183,16 @@ public Task nextDeepBy(Task task) {
186183
public int compareTo(Task task) {
187184
return task.getDeep() - this.getDeep();
188185
}
186+
187+
public void setSelects(List<String> selects) {
188+
this.selects = selects;
189+
}
190+
191+
public void setRetry(Integer retry) {
192+
this.retry = retry;
193+
}
194+
195+
public void setDeep(Integer deep) {
196+
this.deep = deep;
197+
}
189198
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.zhangyingwei.cockroach.queue;
2+
3+
import com.zhangyingwei.cockroach.executer.task.Task;
4+
import com.zhangyingwei.cockroach.queue.filter.IQueueTaskFilter;
5+
import com.zhangyingwei.cockroach.queue.filter.TaskFilterBox;
6+
7+
import java.util.List;
8+
9+
/**
10+
* Created by zhangyw on 2017/9/13.
11+
* 队列接口
12+
*/
13+
public abstract class AbstractCockroachQueue implements CockroachQueue {
14+
protected TaskFilterBox filterBox;
15+
16+
public AbstractCockroachQueue() {
17+
this.filterBox = new TaskFilterBox();
18+
}
19+
20+
@Override
21+
public CockroachQueue filter(IQueueTaskFilter filter) throws Exception {
22+
this.filterBox.add(filter);
23+
return this;
24+
}
25+
}

cockroach-core/src/main/java/com/zhangyingwei/cockroach/queue/TaskQueue.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515
* Created by zhangyw on 2017/8/10.
1616
* 消息队列
1717
*/
18-
public class TaskQueue implements CockroachQueue {
18+
public class TaskQueue extends AbstractCockroachQueue {
1919
private Logger logger = Logger.getLogger(TaskQueue.class);
2020

2121
private BlockingQueue<Task> queue;
2222
private BlockingQueue<Task> faildQueue;
2323

24-
private TaskFilterBox filterBox;
25-
2624
public static TaskQueue of(){
2725
return TaskQueue.of(Constants.DEFAULT_QUEUE_CALACITY);
2826
}
@@ -34,7 +32,6 @@ public static TaskQueue of(int calacity){
3432
public TaskQueue(Integer calacity) {
3533
this.queue = new PriorityBlockingQueue<Task>(calacity,new TaskCompatator());
3634
this.faildQueue = new PriorityBlockingQueue<Task>();
37-
this.filterBox = new TaskFilterBox();
3835
logger.info("create queue whith calacity " + calacity);
3936
}
4037

@@ -73,18 +70,18 @@ private synchronized void queueValid() throws InterruptedException {
7370

7471
@Override
7572
public void push(Task task) throws InterruptedException {
76-
this.push(task, true);
73+
this.queue.put(task);
74+
logger.info(Thread.currentThread().getName() + " push task " + task);
7775
}
7876

7977
@Override
8078
public void push(Task task, Boolean withFilter) throws InterruptedException {
8179
Boolean allow = true;
8280
if (withFilter) {
83-
allow = filterBox.accept(task);
81+
allow = super.filterBox.accept(task);
8482
}
8583
if (allow) {
86-
this.queue.put(task);
87-
logger.info(Thread.currentThread().getName() + " push task " + task);
84+
this.push(task);
8885
}
8986
}
9087

@@ -120,12 +117,6 @@ public void clear(){
120117
logger.info(Thread.currentThread().getName() + " clear queue");
121118
}
122119

123-
@Override
124-
public CockroachQueue filter(IQueueTaskFilter filter) throws Exception {
125-
this.filterBox.add(filter);
126-
return this;
127-
}
128-
129120
@Override
130121
public Boolean isEmpty() {
131122
return this.queue.isEmpty();

cockroach-core/src/main/java/com/zhangyingwei/cockroach/store/PrintStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@
1010
public class PrintStore implements IStore {
1111
@Override
1212
public void store(TaskResponse response) throws IOException {
13-
System.out.println(response.getContent());
13+
System.out.println(response.getContent().string());
1414
}
1515
}

cockroach-queue-redis/pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>cockroach</artifactId>
7+
<groupId>com.github.zhangyingwei</groupId>
8+
<version>1.0.5.04-Beta</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>cockroach-queue-redis</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>com.github.zhangyingwei</groupId>
17+
<artifactId>cockroach-core</artifactId>
18+
<version>1.0.5.04-Beta</version>
19+
</dependency>
20+
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
21+
<dependency>
22+
<groupId>redis.clients</groupId>
23+
<artifactId>jedis</artifactId>
24+
<version>2.9.0</version>
25+
</dependency>
26+
</dependencies>
27+
28+
29+
</project>
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.zhangyingwei.cockroach.queue;
2+
3+
import com.zhangyingwei.cockroach.executer.task.Task;
4+
import com.zhangyingwei.cockroach.queue.filter.IQueueTaskFilter;
5+
import net.sf.json.JSONObject;
6+
import redis.clients.jedis.Jedis;
7+
8+
import java.util.List;
9+
import java.util.stream.Collectors;
10+
11+
/**
12+
* Created by zhangyw on 2018/2/27.
13+
*/
14+
public class RedisTaskQueue extends AbstractCockroachQueue {
15+
private String key;
16+
private String faildKey;
17+
private Jedis jedis;
18+
19+
public RedisTaskQueue(String host, Integer port,String key) {
20+
this.key = key;
21+
this.jedis = new Jedis(host, port);
22+
}
23+
24+
public static RedisTaskQueue of(String host, Integer port,String key){
25+
return new RedisTaskQueue(host,port,key);
26+
}
27+
28+
@Override
29+
public synchronized Task poll() throws Exception {
30+
String json = this.jedis.rpop(this.key);
31+
JSONObject jsonObject = JSONObject.fromObject(json);
32+
return (Task) JSONObject.toBean(jsonObject, Task.class);
33+
}
34+
35+
@Override
36+
public synchronized Task take() throws Exception {
37+
List<String> json = this.jedis.brpop(Integer.MAX_VALUE,this.key);
38+
JSONObject jsonObject = JSONObject.fromObject(json.get(1));
39+
return (Task) JSONObject.toBean(jsonObject, Task.class);
40+
}
41+
42+
@Override
43+
public void push(Task task) throws Exception {
44+
this.push(task,true);
45+
}
46+
47+
@Override
48+
public void push(Task task, Boolean withFilter) throws Exception {
49+
if (withFilter) {
50+
if (super.filterBox.accept(task)) {
51+
JSONObject json = JSONObject.fromObject(task);
52+
this.jedis.lpush(this.key, json.toString());
53+
}
54+
} else {
55+
JSONObject json = JSONObject.fromObject(task);
56+
this.jedis.lpush(this.key, json.toString());
57+
}
58+
}
59+
60+
@Override
61+
public void falied(Task task) throws Exception {
62+
JSONObject json = JSONObject.fromObject(task);
63+
this.jedis.lpush(this.faildKey, json.toString());
64+
}
65+
66+
@Override
67+
public void pushAll(List<Task> tasks) throws Exception {
68+
for (Task task : tasks) {
69+
push(task);
70+
}
71+
}
72+
73+
@Override
74+
public void push(List<String> urls) throws Exception {
75+
List<Task> tasks = urls.stream().map(url -> {
76+
return new Task(url);
77+
}).collect(Collectors.toList());
78+
pushAll(tasks);
79+
}
80+
81+
@Override
82+
public void clear() throws Exception {
83+
this.jedis.del(this.key);
84+
this.jedis.del(this.faildKey);
85+
}
86+
87+
@Override
88+
public Boolean isEmpty() {
89+
long size = this.jedis.llen(this.key);
90+
long faildSize = this.jedis.llen(this.faildKey);
91+
return size + faildSize == 0;
92+
}
93+
}

cockroach-samples/cockroach-samples.iml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<module type="JAVA_MODULE" version="4">
3-
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
2+
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3+
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
44
<output url="file://$MODULE_DIR$/target/classes" />
55
<output-test url="file://$MODULE_DIR$/target/test-classes" />
66
<content url="file://$MODULE_DIR$">
77
<sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
88
<sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
9+
<sourceFolder url="file://$MODULE_DIR$/src/test/java" isTestSource="true" />
910
<excludeFolder url="file://$MODULE_DIR$/target" />
1011
</content>
1112
<orderEntry type="inheritedJdk" />

cockroach-test/cockroach-test.iml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<module type="JAVA_MODULE" version="4">
3-
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8" inherit-compiler-output="false">
2+
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3+
<component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
44
<output url="file://$MODULE_DIR$/target/classes" />
55
<output-test url="file://$MODULE_DIR$/target/test-classes" />
66
<content url="file://$MODULE_DIR$">
@@ -23,6 +23,9 @@
2323
<orderEntry type="library" name="Maven: cn.wanghaomiao:JsoupXpath:0.3.2" level="project" />
2424
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.3.2" level="project" />
2525
<orderEntry type="module" module-name="cockroach-annotation" />
26+
<orderEntry type="module" module-name="cockroach-queue-redis" />
27+
<orderEntry type="library" name="Maven: redis.clients:jedis:2.9.0" level="project" />
28+
<orderEntry type="library" name="Maven: org.apache.commons:commons-pool2:2.4.2" level="project" />
2629
<orderEntry type="library" name="Maven: junit:junit:4.12" level="project" />
2730
<orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.3" level="project" />
2831
<orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />

0 commit comments

Comments
 (0)