Skip to content

Commit 431f7c9

Browse files
committed
学习并使用hbase
1 parent a8df49f commit 431f7c9

File tree

4 files changed

+242
-6
lines changed

4 files changed

+242
-6
lines changed

es/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
docker run --name es -d -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.2.0
2+
3+
4+
unzip elasticsearch-analysis-ik-7.2.0.zip -d ik
5+
6+
7+
docker cp ik es:/usr/share/elasticsearch/plugins/ik/
8+
9+
docker restart es

hbase/README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
docker run -d -h docker-hbase \
2+
-p 2181:2181 \
3+
-p 8080:8080 \
4+
-p 8085:8085 \
5+
-p 9090:9090 \
6+
-p 9000:9000 \
7+
-p 9095:9095 \
8+
-p 16000:16000 \
9+
-p 16010:16010 \
10+
-p 16201:16201 \
11+
-p 16301:16301 \
12+
-p 16020:16020\
13+
--name hbase \
14+
harisekhon/hbase
15+
16+
docker exec -it hbase /bin/bash
17+
18+
hbase shell
19+
20+
#### info为列簇,不建议有过多列簇
21+
create 'user','info'
22+
23+
给 hosts 添加
24+
docker-hbase 你的docker地址
25+

hbase/hbase_test.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
package hbase
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/tsuna/gohbase"
7+
"github.com/tsuna/gohbase/filter"
8+
"github.com/tsuna/gohbase/hrpc"
9+
"io"
10+
"math"
11+
"os"
12+
"strconv"
13+
"testing"
14+
"time"
15+
)
16+
17+
var client gohbase.Client
18+
19+
//每一秒产生一个
20+
func GetRowKey(uid uint32) string {
21+
return fmt.Sprintf("%s%d", strconv.Itoa(int(uid)), math.MaxInt64-time.Now().Unix())
22+
}
23+
func reverse(str string) string {
24+
var result string
25+
length := len(str)
26+
for i := 0; i < length; i++ {
27+
result += fmt.Sprintf("%c", str[length-i-1])
28+
}
29+
return result
30+
}
31+
32+
func TestMain(m *testing.M) {
33+
client = gohbase.NewClient("172.13.3.160")
34+
os.Exit(m.Run())
35+
}
36+
37+
func TestCreate(t *testing.T) {
38+
for i := 10; i <= 60; i++ {
39+
val := map[string]map[string][]byte{
40+
"info": {
41+
"name": []byte(fmt.Sprintf("test%d", i)),
42+
"age": []byte(fmt.Sprintf("%d", i)),
43+
},
44+
}
45+
rowKey := GetRowKey(uint32(i))
46+
t.Log("rowKey", rowKey)
47+
p, err := hrpc.NewPutStr(context.Background(), "user", rowKey, val)
48+
if err != nil {
49+
t.Error(err)
50+
return
51+
}
52+
rsp, err := client.Put(p)
53+
if err != nil {
54+
t.Error(err)
55+
return
56+
}
57+
t.Log(rsp.String())
58+
}
59+
}
60+
61+
func TestGet(t *testing.T) {
62+
q, err := hrpc.NewGetStr(context.Background(), "user", "543219223372035248499836")
63+
rsp, err := client.Get(q)
64+
if err != nil {
65+
t.Error(err)
66+
return
67+
}
68+
t.Log(rsp.String())
69+
for _, v := range rsp.Cells {
70+
val := v
71+
t.Log(string(val.Qualifier), string(val.Value))
72+
}
73+
}
74+
75+
func TestDel(t *testing.T) {
76+
d, err := hrpc.NewDelStr(context.Background(), "user", "543219223372035248499836", nil)
77+
if err != nil {
78+
t.Error(err)
79+
}
80+
rsp, err := client.Delete(d)
81+
if err != nil {
82+
t.Error(err)
83+
return
84+
}
85+
t.Log(rsp.String())
86+
}
87+
88+
func TestGetFamily(t *testing.T) {
89+
//109223372035248490685 column=info:age, timestamp=1606285123443, value=10
90+
//109223372035248490685 column=info:name, timestamp=1606285123443, value=test10
91+
family := map[string][]string{"info": []string{"name"}}
92+
q, err := hrpc.NewGetStr(context.Background(), "user", "109223372035248490685", hrpc.Families(family))
93+
rsp, err := client.Get(q)
94+
if err != nil {
95+
t.Error(err)
96+
return
97+
}
98+
t.Log(rsp.String())
99+
for _, v := range rsp.Cells {
100+
val := v
101+
t.Log(string(val.Qualifier), string(val.Value))
102+
}
103+
}
104+
105+
func TestScanStr(t *testing.T) {
106+
//rowKey 前缀匹配
107+
pFilter := filter.NewPrefixFilter([]byte("17"))
108+
q, err := hrpc.NewScanStr(context.Background(), "user", hrpc.Filters(pFilter))
109+
if err != nil {
110+
t.Error(err)
111+
return
112+
}
113+
scanRsp := client.Scan(q)
114+
for {
115+
res, err := scanRsp.Next()
116+
if err == io.EOF {
117+
break
118+
}
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
for _, v := range res.Cells {
123+
val := v
124+
t.Log(string(val.Qualifier), string(val.Value))
125+
}
126+
}
127+
}
128+
129+
func TestCreate2(t *testing.T) {
130+
for i := 1; i <= 50; i++ {
131+
val := map[string]map[string][]byte{
132+
"info": {
133+
"name": []byte(fmt.Sprintf("hero%d", i)),
134+
"age": []byte(fmt.Sprintf("%d", i)),
135+
},
136+
}
137+
rowKey := GetRowKey(uint32(123))
138+
t.Log("rowKey", rowKey)
139+
p, err := hrpc.NewPutStr(context.Background(), "user", rowKey, val)
140+
if err != nil {
141+
t.Error(err)
142+
return
143+
}
144+
rsp, err := client.Put(p)
145+
if err != nil {
146+
t.Error(err)
147+
return
148+
}
149+
t.Log(rsp.String())
150+
time.Sleep(time.Second)
151+
}
152+
}
153+
154+
//分页查询
155+
func TestScanRangeStr(t *testing.T) {
156+
f := filter.NewPageFilter(11)
157+
q, err := hrpc.NewScanRangeStr(context.Background(), "user", "1239223372035248488854", "", hrpc.Filters(f))
158+
if err != nil {
159+
t.Error(err)
160+
return
161+
}
162+
scanRsp := client.Scan(q)
163+
for {
164+
res, err := scanRsp.Next()
165+
if err == io.EOF {
166+
break
167+
}
168+
if err != nil {
169+
t.Fatal(err)
170+
}
171+
for _, v := range res.Cells {
172+
val := v
173+
t.Log(string(val.Qualifier), string(val.Value), string(val.Row))
174+
}
175+
}
176+
}
177+
178+
func TestScanRangeStr2(t *testing.T) {
179+
// 获取这个区间的数据
180+
// 1606286961323 2020-11-25 14:49:21
181+
// 1606286952113 2020-11-25 14:49:12
182+
startKey := fmt.Sprintf("%d%d", 123, math.MaxInt64-1606286961)
183+
endKey := fmt.Sprintf("%d%d", 123, math.MaxInt64-1606286952)
184+
q, err := hrpc.NewScanRangeStr(context.Background(), "user", startKey, endKey)
185+
if err != nil {
186+
t.Error(err)
187+
return
188+
}
189+
scanRsp := client.Scan(q)
190+
for {
191+
res, err := scanRsp.Next()
192+
if err == io.EOF {
193+
break
194+
}
195+
if err != nil {
196+
t.Fatal(err)
197+
}
198+
for _, v := range res.Cells {
199+
val := v
200+
t.Log(string(val.Qualifier), string(val.Value), string(val.Row))
201+
}
202+
}
203+
}

kafka/kafka_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ func Test_Queue(t *testing.T) {
1414
SyncProducer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
1515
CheckErr(err)
1616
defer SyncProducer.Close()
17-
SyncProducer.SendMessage(&sarama.ProducerMessage{
18-
})
17+
SyncProducer.SendMessage(&sarama.ProducerMessage{})
1918
}
2019

2120
func Test_Producer(t *testing.T) {
@@ -25,7 +24,7 @@ func Test_Producer(t *testing.T) {
2524
config.Producer.Return.Successes = true
2625
config.Producer.Return.Errors = true
2726
//异步
28-
AsyncProducer, err := sarama.NewAsyncProducer([]string{"localhost:9092","localhost:9093","localhost:9094"}, config)
27+
AsyncProducer, err := sarama.NewAsyncProducer([]string{"localhost:9092", "localhost:9093", "localhost:9094"}, config)
2928
CheckErr(err)
3029
defer AsyncProducer.AsyncClose()
3130
go func() {
@@ -36,7 +35,7 @@ func Test_Producer(t *testing.T) {
3635
case err := <-AsyncProducer.Errors():
3736
fmt.Println("AsyncProducer.Errors()", err.Error())
3837
}
39-
, ,, }
38+
}
4039
}()
4140
for {
4241
Message := &sarama.ProducerMessage{
@@ -52,7 +51,7 @@ func Test_Producer(t *testing.T) {
5251
func Test_ConsumerGroup(t *testing.T) {
5352
config := sarama.NewConfig()
5453
config.Version = sarama.V2_2_0_0
55-
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092","localhost:9093","localhost:9094"}, "test_group", config)
54+
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092", "localhost:9093", "localhost:9094"}, "test_group", config)
5655
CheckErr(err)
5756
consumer := Consumer1{}
5857
for {
@@ -65,7 +64,7 @@ func Test_ConsumerGroup(t *testing.T) {
6564
func Test_ConsumerGroup2(t *testing.T) {
6665
config := sarama.NewConfig()
6766
config.Version = sarama.V2_2_0_0
68-
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092","localhost:9093","localhost:9094"}, "test_group_1", config)
67+
consumerGroup, err := sarama.NewConsumerGroup([]string{"localhost:9092", "localhost:9093", "localhost:9094"}, "test_group_1", config)
6968
CheckErr(err)
7069
consumer := Consumer{}
7170
for {

0 commit comments

Comments
 (0)