Skip to content

Commit b23f6e8

Browse files
author
jk
committed
基于etcd与viper的配置中心
1 parent 84510fc commit b23f6e8

File tree

4 files changed

+206
-34
lines changed

4 files changed

+206
-34
lines changed

hconf/hconf.go

Lines changed: 161 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,199 @@
11
package hconf
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
57
"github.com/spf13/viper"
8+
"go.etcd.io/etcd/api/v3/mvccpb"
69
clientv3 "go.etcd.io/etcd/client/v3"
10+
"gopkg.in/yaml.v2"
11+
"log"
712
"sync"
13+
"time"
814
)
915

1016
type Register struct {
11-
etcdCli *clientv3.Client
12-
opts *Options
13-
mu sync.Mutex
14-
confDef map[string]interface{}
15-
WatchConfKey []string `json:"watch_conf_key"`
17+
etcdCli *clientv3.Client
18+
opts *Options
19+
viper *viper.Viper
20+
mu sync.Mutex
21+
confDef map[string]interface{}
22+
needPutKey []string
1623
}
1724

1825
func NewHConf(opt ...RegisterOptions) (*Register, error) {
1926
s := &Register{
2027
opts: newOptions(opt...),
2128
confDef: make(map[string]interface{}),
29+
viper: viper.New(),
30+
}
31+
s.viper.SetConfigFile(s.opts.LocalConfName)
32+
if !s.opts.UseLocalConf() {
33+
if len(s.opts.WatchRootName) == 0 {
34+
return nil, errors.New("watch_conf_key is empty")
35+
}
2236
}
2337
etcdCli, err := clientv3.New(s.opts.EtcdConf)
2438
if err != nil {
2539
return nil, err
2640
}
2741
s.etcdCli = etcdCli
42+
if !s.checkEtcd() {
43+
log.Println("Link etcd failed to use local configuration")
44+
s.opts.UseLocal = true
45+
}
2846
return s, nil
2947
}
3048

31-
func (r *Register) Run() {
32-
r.loadLocal()
49+
func (r *Register) Run() error {
50+
if r.opts.UseLocalConf() {
51+
return r.loadLocal()
52+
}
53+
if err := r.putNeedKv(); err != nil {
54+
return err
55+
}
56+
r.watchRookKey()
57+
return nil
3358
}
3459

35-
func (r *Register) loadLocal() {
36-
v := viper.New()
37-
v.SetConfigFile(r.opts.LocalConfName)
38-
if err := v.ReadInConfig(); err != nil {
39-
fmt.Println("1111", err)
40-
return
60+
func (r *Register) putNeedKv() error {
61+
if len(r.needPutKey) == 0 {
62+
return nil
4163
}
42-
for k, _ := range r.confDef {
43-
if err := v.UnmarshalKey(k, r.confDef[k]); err != nil {
44-
fmt.Println("22222", k, err)
45-
return
64+
if err := r.viper.ReadInConfig(); err != nil {
65+
return err
66+
}
67+
for _, k := range r.needPutKey {
68+
var mapData = make(map[string]interface{})
69+
if err := r.viper.UnmarshalKey(k, &mapData); err != nil {
70+
return err
71+
}
72+
if err := r.putEtcdConf(k, mapData); err != nil {
73+
return err
4674
}
75+
_ = r.viper.UnmarshalKey(k, r.confDef[k])
4776
}
77+
return nil
4878
}
4979

50-
func (r *Register) GetConfByKey(key string, val interface{}) {
80+
func (r *Register) GetConfByKey(key string, val interface{}) error {
5181
r.mu.Lock()
5282
defer r.mu.Unlock()
5383
r.confDef[key] = val
54-
if r.opts.UseLocal {
84+
if r.opts.UseLocalConf() {
85+
return nil
86+
}
87+
res, err := r.readEtcdConf(key)
88+
if err != nil {
89+
return err
90+
}
91+
if len(res) == 0 {
92+
r.needPutKey = append(r.needPutKey, key)
93+
return nil
94+
}
95+
return yaml.Unmarshal(res, val)
96+
}
97+
98+
func (r *Register) Close() error {
99+
if err := r.etcdCli.Close(); err != nil {
100+
return err
101+
}
102+
for k, _ := range r.confDef {
103+
r.viper.Set(k, r.confDef[k])
104+
}
105+
return r.viper.WriteConfig()
106+
}
107+
108+
func (r *Register) readEtcdConf(key string) ([]byte, error) {
109+
ctx, cancel := context.WithTimeout(context.Background(), r.opts.EtcdReadTimeOut)
110+
defer cancel()
111+
res, err := r.etcdCli.Get(ctx, key)
112+
if err != nil {
113+
return nil, err
114+
}
115+
for _, v := range res.Kvs {
116+
if string(v.Key) == key {
117+
return v.Value, nil
118+
}
119+
}
120+
return nil, err
121+
}
122+
123+
func (r *Register) putEtcdConf(key string, val interface{}) error {
124+
if fmt.Sprint(val) == "map[]" {
125+
log.Println(fmt.Sprintf("No %s configuration found in etcd or local", key))
126+
return errors.New(fmt.Sprintf("No %s configuration found in etcd or local", key))
127+
}
128+
ctx, cancel := context.WithTimeout(context.Background(), r.opts.EtcdReadTimeOut)
129+
defer cancel()
130+
data, err := yaml.Marshal(val)
131+
if err != nil {
132+
return err
133+
}
134+
_, err = r.etcdCli.Put(ctx, key, string(data))
135+
if err != nil {
136+
return err
137+
}
138+
log.Printf("put key %s ,data %s \n", key, string(data))
139+
return nil
140+
}
141+
142+
func (r *Register) loadLocalKey() error {
143+
if err := r.viper.ReadInConfig(); err != nil {
144+
return err
145+
}
146+
return nil
147+
}
148+
149+
func (r *Register) loadLocal() error {
150+
if err := r.viper.ReadInConfig(); err != nil {
151+
return err
152+
}
153+
for k, _ := range r.confDef {
154+
if err := r.viper.UnmarshalKey(k, r.confDef[k]); err != nil {
155+
return err
156+
}
157+
}
158+
return nil
159+
}
160+
161+
func (r *Register) checkEtcd() bool {
162+
ctx, cancel := context.WithTimeout(context.Background(), r.opts.EtcdReadTimeOut*time.Duration(len(r.opts.EtcdConf.Endpoints)))
163+
defer cancel()
164+
for _, v := range r.opts.EtcdConf.Endpoints {
165+
res, err := r.etcdCli.Status(ctx, v)
166+
if res == nil || err != nil {
167+
return false
168+
}
169+
}
170+
return true
171+
}
172+
173+
func (r *Register) watchRookKey() {
174+
if r.opts.UseLocalConf() {
55175
return
56176
}
177+
for _, v := range r.opts.WatchRootName {
178+
go func(register *Register, key string) {
179+
rch := r.etcdCli.Watch(context.Background(), key, clientv3.WithPrefix())
180+
for res := range rch {
181+
for _, ev := range res.Events {
182+
switch ev.Type {
183+
case mvccpb.PUT: //新增或修改
184+
k := string(ev.Kv.Key)
185+
if val, ok := register.confDef[k]; ok {
186+
register.mu.Lock()
187+
if err := yaml.Unmarshal(ev.Kv.Value, val); err != nil {
188+
log.Printf("change key %s ,data %s,err %s \n", k, string(ev.Kv.Value), err)
189+
}
190+
log.Printf("change key %s success data {%s}\n", k, string(ev.Kv.Value))
191+
register.mu.Unlock()
192+
}
193+
}
194+
}
195+
}
196+
}(r, v)
197+
}
198+
57199
}

hconf/hconf.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/gs/conf/net:
2+
host: "123123111111"
3+
port: "123123"
4+
/gs/conf/net2222:
5+
host: "123123111111"
6+
port: "123123"
7+
/gs/conf/net3333:
8+
host: ""
9+
port: ""

hconf/hconf_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package hconf
22

3-
import "testing"
3+
import (
4+
"testing"
5+
)
46

57
type Conf struct {
6-
Net Net
8+
Net Net
9+
Net2 Net
10+
Net3 Net
711
}
812

913
type Net struct {
@@ -13,12 +17,20 @@ type Net struct {
1317

1418
func TestHConf(t *testing.T) {
1519
var conf = Conf{}
16-
r, err := NewHConf()
20+
r, err := NewHConf(
21+
SetWatchRootName([]string{"/gs/conf"}),
22+
)
1723
if err != nil {
1824
t.Error(err)
1925
return
2026
}
21-
r.GetConfByKey("/gs/conf/net", &conf.Net)
22-
r.Run()
27+
t.Log(r.GetConfByKey("/gs/conf/net", &conf.Net))
28+
t.Log(r.GetConfByKey("/gs/conf/net2222", &conf.Net2))
29+
t.Log(r.GetConfByKey("/gs/conf/net3333", &conf.Net3))
30+
if err := r.Run(); err != nil {
31+
t.Error(err)
32+
return
33+
}
2334
t.Log(conf)
35+
t.Log(r.Close())
2436
}

hconf/options.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import (
66
)
77

88
type Options struct {
9-
EtcdConf clientv3.Config `json:"-"`
10-
RegisterTtl time.Duration `json:"register_ttl"`
11-
LocalConfName string `json:"local_conf_name"`
12-
UseLocal bool `json:"use_local"`
13-
WatchRoot []string `json:"watch_root"`
9+
EtcdConf clientv3.Config
10+
EtcdReadTimeOut time.Duration
11+
LocalConfName string
12+
UseLocal bool
13+
WatchRootName []string
1414
}
1515

1616
type RegisterOptions func(*Options)
@@ -21,16 +21,20 @@ func newOptions(opt ...RegisterOptions) *Options {
2121
Endpoints: []string{"127.0.0.1:2379"},
2222
DialTimeout: 3 * time.Second,
2323
},
24-
RegisterTtl: 2 * time.Second,
25-
LocalConfName: "./hconf.yaml",
26-
UseLocal: false,
24+
EtcdReadTimeOut: 2 * time.Second,
25+
LocalConfName: "./hconf.yaml",
26+
UseLocal: false,
2727
}
2828
for _, o := range opt {
2929
o(opts)
3030
}
3131
return opts
3232
}
3333

34+
func (p *Options) UseLocalConf() bool {
35+
return p.UseLocal
36+
}
37+
3438
func SetEtcdConf(conf clientv3.Config) RegisterOptions {
3539
return func(options *Options) {
3640
options.EtcdConf = conf
@@ -42,15 +46,20 @@ func SetUseLocal() RegisterOptions {
4246
options.UseLocal = true
4347
}
4448
}
49+
func SetEtcdReadTimeOut(t time.Duration) RegisterOptions {
50+
return func(options *Options) {
51+
options.EtcdReadTimeOut = t
52+
}
53+
}
4554

4655
func SetLocalConfName(name string) RegisterOptions {
4756
return func(options *Options) {
4857
options.LocalConfName = name
4958
}
5059
}
5160

52-
func SetWatchRoot(name []string) RegisterOptions {
61+
func SetWatchRootName(name []string) RegisterOptions {
5362
return func(options *Options) {
54-
options.WatchRoot = name
63+
options.WatchRootName = name
5564
}
5665
}

0 commit comments

Comments
 (0)