Skip to content

Commit 5ad703f

Browse files
committed
添加 连接阿里 recketmq 版本
1 parent b0977ca commit 5ad703f

File tree

10 files changed

+436
-0
lines changed

10 files changed

+436
-0
lines changed

springboot-rocketmq-ali/pom.xml

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>cn.abel</groupId>
8+
<artifactId>springboot-rocketmq-ali</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
<parent>
11+
<groupId>org.springframework.boot</groupId>
12+
<artifactId>spring-boot-starter-parent</artifactId>
13+
<version>2.0.4.RELEASE</version>
14+
<relativePath/> <!-- lookup parent from repository -->
15+
</parent>
16+
17+
<dependencies>
18+
<!--springboot-->
19+
<dependency>
20+
<groupId>org.springframework.boot</groupId>
21+
<artifactId>spring-boot-starter-web</artifactId>
22+
</dependency>
23+
<dependency>
24+
<groupId>org.springframework.boot</groupId>
25+
<artifactId>spring-boot-starter-test</artifactId>
26+
<scope>test</scope>
27+
</dependency>
28+
29+
30+
<dependency>
31+
<groupId>org.apache.commons</groupId>
32+
<artifactId>commons-pool2</artifactId>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.springframework.boot</groupId>
36+
<artifactId>spring-boot-autoconfigure</artifactId>
37+
</dependency>
38+
<!--rockermq-->
39+
<dependency>
40+
<groupId>com.aliyun.openservices</groupId>
41+
<artifactId>ons-client</artifactId>
42+
<version>1.8.3.Final</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>com.alibaba</groupId>
46+
<artifactId>fastjson</artifactId>
47+
<version>1.2.58</version>
48+
</dependency>
49+
50+
</dependencies>
51+
<build>
52+
<plugins>
53+
<plugin>
54+
<groupId>org.springframework.boot</groupId>
55+
<artifactId>spring-boot-maven-plugin</artifactId>
56+
</plugin>
57+
<plugin>
58+
<groupId>org.apache.maven.plugins</groupId>
59+
<artifactId>maven-compiler-plugin</artifactId>
60+
</plugin>
61+
<plugin>
62+
<groupId>org.apache.maven.plugins</groupId>
63+
<artifactId>maven-surefire-plugin</artifactId>
64+
<version>2.18.1</version>
65+
<configuration>
66+
<skipTests>true</skipTests>
67+
</configuration>
68+
</plugin>
69+
</plugins>
70+
</build>
71+
</project>
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package cn.abel.queue;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
/**
7+
* @author yyb
8+
* @time 2019/8/13
9+
*/
10+
@SpringBootApplication
11+
public class Application {
12+
public static void main(String[] args) {
13+
SpringApplication.run(Application.class, args);
14+
}
15+
16+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package cn.abel.queue.config;
2+
3+
import cn.abel.queue.service.MessageHandler;
4+
import com.aliyun.openservices.ons.api.MessageListener;
5+
import com.aliyun.openservices.ons.api.PropertyKeyConst;
6+
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
7+
import com.aliyun.openservices.ons.api.bean.Subscription;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.springframework.beans.factory.annotation.Autowired;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
import java.util.Properties;
18+
19+
/**
20+
* @author yyb
21+
* @time 2020/3/25
22+
*/
23+
24+
@Configuration
25+
public class ALiConsumerClient {
26+
private static final Logger logger = LoggerFactory.getLogger(ALiConsumerClient.class);
27+
/**
28+
* topic tag。
29+
*/
30+
private static final String TOPIC_TAG = "*";
31+
/**
32+
* 消费者线程数。
33+
*/
34+
private static final int CONSUMER_THREAD_COUNT = 10;
35+
/**
36+
* 重新消费次数。
37+
*/
38+
private static final int RECONSUME_TIMES = 2;
39+
40+
private MessageHandler handler;
41+
42+
@Value("${ali.rocket.mq.access-key}")
43+
private String accessKey;
44+
@Value("${ali.rocket.mq.secret-key}")
45+
private String secretKey;
46+
@Value("${ali.rocket.mq.name-srv-addr}")
47+
private String nameSrvAddr;
48+
@Value("${ali.rocket.mq.test-topic}")
49+
private String topic;
50+
@Value("${ali.rocket.mq.test-group-id}")
51+
private String group;
52+
53+
@Bean(initMethod = "start", destroyMethod = "shutdown")
54+
public ConsumerBean buildConsumer() {
55+
ConsumerBean consumerBean = new ConsumerBean();
56+
Properties properties = new Properties();
57+
properties.setProperty(PropertyKeyConst.AccessKey, accessKey);
58+
properties.setProperty(PropertyKeyConst.SecretKey, secretKey);
59+
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
60+
properties.setProperty(PropertyKeyConst.GROUP_ID, group);
61+
properties.setProperty(PropertyKeyConst.ConsumeThreadNums, String.valueOf(CONSUMER_THREAD_COUNT));
62+
properties.setProperty(PropertyKeyConst.MaxReconsumeTimes, String.valueOf(RECONSUME_TIMES));
63+
consumerBean.setProperties(properties);
64+
65+
Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
66+
Subscription subscription = new Subscription();
67+
subscription.setTopic(topic);
68+
subscription.setExpression(TOPIC_TAG);
69+
subscriptionTable.put(subscription, handler);
70+
consumerBean.setSubscriptionTable(subscriptionTable);
71+
logger.info("初始化rocketMq完成 config:{}");
72+
return consumerBean;
73+
}
74+
75+
@Autowired
76+
public ALiConsumerClient(MessageHandler handler) {
77+
this.handler = handler;
78+
}
79+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package cn.abel.queue.config;
2+
3+
import com.aliyun.openservices.ons.api.PropertyKeyConst;
4+
import org.springframework.boot.context.properties.ConfigurationProperties;
5+
import org.springframework.stereotype.Component;
6+
7+
import java.util.Properties;
8+
9+
/**
10+
* @author yyb
11+
* @time 2020/3/25
12+
*/
13+
@Component
14+
@ConfigurationProperties(prefix = "ali.rocket.mq")
15+
public class ALiMqConfig {
16+
private String accessKey;
17+
private String secretKey;
18+
private String nameSrvAddr;
19+
private String testTopic;
20+
private String testGroupId;
21+
private String testTag;
22+
23+
public Properties getMqProperties() {
24+
Properties properties = new Properties();
25+
properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
26+
properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
27+
properties.setProperty(PropertyKeyConst.GROUP_ID, testGroupId);
28+
properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
29+
return properties;
30+
}
31+
32+
public String getAccessKey() {
33+
return accessKey;
34+
}
35+
36+
public void setAccessKey(String accessKey) {
37+
this.accessKey = accessKey;
38+
}
39+
40+
public String getSecretKey() {
41+
return secretKey;
42+
}
43+
44+
public void setSecretKey(String secretKey) {
45+
this.secretKey = secretKey;
46+
}
47+
48+
public String getNameSrvAddr() {
49+
return nameSrvAddr;
50+
}
51+
52+
public void setNameSrvAddr(String nameSrvAddr) {
53+
this.nameSrvAddr = nameSrvAddr;
54+
}
55+
56+
public String getTestTopic() {
57+
return testTopic;
58+
}
59+
60+
public void setTestTopic(String testTopic) {
61+
this.testTopic = testTopic;
62+
}
63+
64+
public String getTestGroupId() {
65+
return testGroupId;
66+
}
67+
68+
public void setTestGroupId(String testGroupId) {
69+
this.testGroupId = testGroupId;
70+
}
71+
72+
public String getTestTag() {
73+
return testTag;
74+
}
75+
76+
public void setTestTag(String testTag) {
77+
this.testTag = testTag;
78+
}
79+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package cn.abel.queue.config;
2+
3+
import com.aliyun.openservices.ons.api.bean.ProducerBean;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.context.annotation.Bean;
6+
import org.springframework.stereotype.Component;
7+
8+
/**
9+
* @author yyb
10+
* @time 2020/3/25
11+
*/
12+
@Component
13+
public class ALiProducerClient {
14+
@Autowired
15+
private ALiMqConfig aLiMqConfig;
16+
17+
@Bean(name = "producer", initMethod = "start", destroyMethod = "shutdown")
18+
public ProducerBean build() {
19+
ProducerBean bean = new ProducerBean();
20+
bean.setProperties(aLiMqConfig.getMqProperties());
21+
return bean;
22+
}
23+
24+
@Autowired
25+
public ALiProducerClient(ALiMqConfig aLiMqConfig) {
26+
this.aLiMqConfig = aLiMqConfig;
27+
}
28+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package cn.abel.queue.service;
2+
3+
import com.aliyun.openservices.ons.api.Action;
4+
import com.aliyun.openservices.ons.api.ConsumeContext;
5+
import com.aliyun.openservices.ons.api.Message;
6+
import com.aliyun.openservices.ons.api.MessageListener;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.stereotype.Component;
10+
11+
import java.nio.charset.StandardCharsets;
12+
13+
/**
14+
* @author yyb
15+
* @time 2020/3/25
16+
*/
17+
@Component
18+
public class MessageHandler implements MessageListener {
19+
private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
20+
21+
@Override
22+
public Action consume(Message message, ConsumeContext consumeContext) {
23+
String msgStr = new String(message.getBody(), StandardCharsets.UTF_8);
24+
//接收到消息
25+
logger.info("Receive message msgId:{} retryTimes:{} body:{}", message.getMsgID(),
26+
message.getReconsumeTimes(), msgStr);
27+
//提交
28+
return Action.CommitMessage;
29+
}
30+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package cn.abel.queue.service;
2+
3+
import cn.abel.queue.config.ALiMqConfig;
4+
import com.alibaba.fastjson.JSON;
5+
import com.aliyun.openservices.ons.api.Message;
6+
import com.aliyun.openservices.ons.api.OnExceptionContext;
7+
import com.aliyun.openservices.ons.api.SendCallback;
8+
import com.aliyun.openservices.ons.api.SendResult;
9+
import com.aliyun.openservices.ons.api.bean.ProducerBean;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
import org.springframework.beans.factory.annotation.Autowired;
13+
import org.springframework.stereotype.Service;
14+
15+
import java.nio.charset.StandardCharsets;
16+
import java.util.function.Predicate;
17+
18+
19+
/**
20+
* @author yyb
21+
* @time 2019/8/13
22+
*/
23+
24+
@Service
25+
public class ProducerService {
26+
private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);
27+
28+
@Autowired
29+
private ALiMqConfig aLiMqConfig;
30+
@Autowired
31+
private ProducerBean producer;
32+
33+
34+
/**
35+
* 调用此方法 发送消息,o 为自定义的消息体
36+
*/
37+
public void sendMessage(Object o) {
38+
Message message = new Message(aLiMqConfig.getStockTopic(), "test", JSON.toJSON(o).toString().getBytes(StandardCharsets.UTF_8));
39+
//向mq发送消息
40+
sendAsync(ProducerBean::isStarted, producer, message);
41+
}
42+
43+
/**
44+
* 向阿里云rocket mq发送消息。
45+
*
46+
* @param predicate 断言
47+
* @param bean 发送消息bean
48+
* @param message 需要发送的消息
49+
*/
50+
private static void sendAsync(Predicate<ProducerBean> predicate, ProducerBean bean, Message message) {
51+
if (predicate.test(bean)) {
52+
bean.sendAsync(message, new SendCallback() {
53+
@Override
54+
public void onSuccess(SendResult sendResult) {
55+
logger.info("向mq推送库存消息成功,消息是:{}", sendResult.toString());
56+
}
57+
@Override
58+
public void onException(OnExceptionContext e) {
59+
logger.error("向mq推送库存消息失败,消息id 为 {} 错误是:{}", e.getMessageId(), e.getException().getMessage());
60+
}
61+
});
62+
} else {
63+
logger.error("mq库存生产端启动失败!!!消息是:{}", message.toString());
64+
}
65+
}
66+
67+
}

0 commit comments

Comments
 (0)