Skip to content

Commit 2a3dc99

Browse files
committed
添加rocketmq
1 parent 48a8450 commit 2a3dc99

File tree

3 files changed

+137
-0
lines changed

3 files changed

+137
-0
lines changed

rocketmq/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#### docker run --name rmqnamesrv -d -p 9876:9876 rocketmqinc/rocketmq:latest sh mqnamesrv
2+
3+
4+
5+
#### broker.conf
6+
```base
7+
terName = DefaultCluster
8+
brokerName = broker-a
9+
brokerId = 0
10+
deleteWhen = 04
11+
fileReservedTime = 48
12+
brokerRole = ASYNC_MASTER
13+
flushDiskType = ASYNC_FLUSH
14+
brokerIP1 = 172.13.3.160
15+
autoCreateTopicEnable=true
16+
```
17+
18+
19+
#### docker run --name rmqbroker -d -p 10911:10911 -p 10909:10909 --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" -v /root/docker_data/broker.conf:/opt/rocketmq/conf/broker.conf rocketmqinc/rocketmq:latest sh mqbroker -c /opt/rocketmq/conf/broker.conf
20+
21+
22+
23+
#### docker run --name rmqconsole -d -p 8080:8080 --link rmqnamesrv:namesrv -e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876" pangliang/rocketmq-console-ng

rocketmq/go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module learning_tools/rocketmq
2+
3+
go 1.15
4+
5+
require github.com/apache/rocketmq-client-go/v2 v2.1.0-rc5

rocketmq/mq_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package rocketmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/apache/rocketmq-client-go/v2"
7+
"github.com/apache/rocketmq-client-go/v2/consumer"
8+
"github.com/apache/rocketmq-client-go/v2/primitive"
9+
"github.com/apache/rocketmq-client-go/v2/producer"
10+
"os"
11+
"testing"
12+
"time"
13+
)
14+
15+
var mqProducer rocketmq.Producer
16+
17+
func TestMain(m *testing.M) {
18+
var err error
19+
fmt.Println("注册生产者")
20+
// 注册生产者
21+
if mqProducer, err = rocketmq.NewProducer(
22+
producer.WithNsResovler(primitive.NewPassthroughResolver([]string{"172.13.3.160:9876"})),
23+
producer.WithRetry(3),
24+
); err != nil {
25+
panic(err)
26+
}
27+
mqProducer.Start()
28+
os.Exit(m.Run())
29+
}
30+
31+
func TestMqOneWayProducer(t *testing.T) {
32+
err := mqProducer.SendOneWay(context.Background(), &primitive.Message{
33+
Topic: "msg",
34+
Body: []byte("test"),
35+
})
36+
if err != nil {
37+
t.Error(err)
38+
}
39+
}
40+
41+
func TestMqSyncProducer(t *testing.T) {
42+
res, err := mqProducer.SendSync(context.Background(), &primitive.Message{
43+
Topic: "msg",
44+
Body: []byte("Hello"),
45+
})
46+
if err != nil {
47+
t.Error(err)
48+
return
49+
}
50+
fmt.Println(res)
51+
}
52+
53+
func TestMqASyncProducer(t *testing.T) {
54+
err := mqProducer.SendAsync(context.Background(), func(ctx context.Context, result *primitive.SendResult, err error) {
55+
fmt.Println("回调完", time.Now().UnixNano()/1e6)
56+
fmt.Println("result", result)
57+
fmt.Println("err", err)
58+
}, &primitive.Message{
59+
Topic: "msg",
60+
Body: []byte("Hello SendAsync"),
61+
})
62+
if err != nil {
63+
t.Error(err)
64+
return
65+
}
66+
fmt.Println("执行完", time.Now().UnixNano()/1e6)
67+
select {}
68+
}
69+
func TestMqDelaySyncProducer(t *testing.T) {
70+
a := &primitive.Message{
71+
Topic: "msg",
72+
Body: []byte("Hello TestMqDelaySyncProducer"),
73+
}
74+
a.WithTag("group")
75+
a.WithKeys([]string{"123"})
76+
a.WithDelayTimeLevel(3)
77+
res, err := mqProducer.SendSync(context.Background(), a)
78+
if err != nil {
79+
t.Error(err)
80+
return
81+
}
82+
fmt.Println(res)
83+
}
84+
85+
func TestMqSubscribe(t *testing.T) {
86+
//注册消费者
87+
mqPushConsumer, err := rocketmq.NewPushConsumer(
88+
consumer.WithGroupName("test_consumer"),
89+
consumer.WithNsResovler(primitive.NewPassthroughResolver([]string{"172.13.3.160:9876"})),
90+
)
91+
if err != nil {
92+
panic(err)
93+
}
94+
msgHandler := func(ctx context.Context,
95+
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
96+
for i := range msgs {
97+
fmt.Printf("接受到消息: %v /n", msgs[i])
98+
}
99+
return consumer.ConsumeSuccess, nil
100+
101+
}
102+
err = mqPushConsumer.Subscribe("msg", consumer.MessageSelector{}, msgHandler)
103+
if err != nil {
104+
t.Error(err)
105+
}
106+
fmt.Println("启动消费者")
107+
mqPushConsumer.Start()
108+
select {}
109+
}

0 commit comments

Comments
 (0)