Skip to content

Commit 0d62676

Browse files
author
keimoon
committed
Added config, sentinel
1 parent 6707dfa commit 0d62676

File tree

7 files changed

+330
-47
lines changed

7 files changed

+330
-47
lines changed

config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package gore
2+
3+
// Config keeps some default configurations. Time is measured in second
4+
var Config = &struct {
5+
ConnectTimeout int
6+
RequestTimeout int
7+
ReconnectTime int
8+
PoolInitialSize int
9+
PoolMaximumSize int
10+
}{
11+
ConnectTimeout: 5,
12+
RequestTimeout: 10,
13+
ReconnectTime: 2,
14+
PoolInitialSize: 5,
15+
PoolMaximumSize: 10,
16+
}

conn.go

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,30 @@ type Conn struct {
2121
mutex sync.Mutex
2222
rb *bufio.Reader
2323
wb *bufio.Writer
24+
sentinel bool
2425
RequestTimeout time.Duration
2526
}
2627

2728
// Dial opens a TCP connection with a redis server.
2829
func Dial(address string) (*Conn, error) {
2930
conn := &Conn{
30-
RequestTimeout: 10 * time.Second,
31+
RequestTimeout: time.Duration(Config.RequestTimeout) * time.Second,
3132
}
3233
conn.mutex.Lock()
33-
defer conn.mutex.Unlock()
34+
defer conn.mutex.Unlock()
3435
err := conn.connect(address, 0)
3536
return conn, err
3637
}
3738

3839
// DialTimeout opens a TCP connection with a redis server with a connection timeout
3940
func DialTimeout(address string, timeout time.Duration) (*Conn, error) {
40-
conn := &Conn{
41-
RequestTimeout: 10 * time.Second,
42-
}
41+
conn := &Conn{
42+
RequestTimeout: time.Duration(Config.RequestTimeout) * time.Second,
43+
}
4344
conn.mutex.Lock()
44-
defer conn.mutex.Unlock()
45-
err := conn.connect(address, timeout)
46-
return conn, err
45+
defer conn.mutex.Unlock()
46+
err := conn.connect(address, timeout)
47+
return conn, err
4748
}
4849

4950
// Close closes the connection
@@ -92,21 +93,19 @@ func (c *Conn) connect(address string, timeout time.Duration) error {
9293
}
9394

9495
func (c *Conn) fail() {
95-
c.reconnect()
96-
}
97-
98-
func (c *Conn) reconnect() {
99-
c.mutex.Lock()
100-
defer c.mutex.Unlock()
101-
if c.state == connStateReconnecting {
102-
return
96+
if !c.sentinel {
97+
c.mutex.Lock()
98+
defer c.mutex.Unlock()
99+
if c.state == connStateReconnecting {
100+
return
101+
}
102+
c.tcpConn.Close()
103+
c.state = connStateReconnecting
104+
for {
105+
if err := c.connect(c.address, 0); err == nil {
106+
break
107+
}
108+
time.Sleep(time.Duration(Config.ReconnectTime) * time.Second)
109+
}
103110
}
104-
c.tcpConn.Close()
105-
c.state = connStateReconnecting
106-
for {
107-
if err := c.connect(c.address, 0); err == nil {
108-
break
109-
}
110-
time.Sleep(2 * time.Second)
111-
}
112111
}

doc.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ The following slice element types are supported:
8383
- FixInt and VarInt
8484
- *gore.Pair for converting map data from HGETALL or ZRANGE WITHSCORES
8585
86+
Reply returns from HGETALL or SENTINEL master can be converted into a map
87+
using Map:
88+
89+
m, err:= rep.Map()
90+
8691
Pipeline
8792
8893
Gore supports pipelining using gore.Pipeline:

pool.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Pool struct {
2424
cond *sync.Cond
2525
address string
2626
closed bool
27+
sentinel bool
2728
}
2829

2930
// Dial initializes connection from the pool to redis server.
@@ -34,10 +35,10 @@ func (p *Pool) Dial(address string) error {
3435
p.RequestTimeout = 10 * time.Second
3536
}
3637
if p.InitialConn <= 0 {
37-
p.InitialConn = 5
38+
p.InitialConn = Config.PoolInitialSize
3839
}
3940
if p.MaximumConn <= 0 {
40-
p.MaximumConn = 10
41+
p.MaximumConn = Config.PoolMaximumSize
4142
}
4243
if p.MaximumConn < p.InitialConn {
4344
p.MaximumConn = p.InitialConn
@@ -63,6 +64,7 @@ func (p *Pool) Close() {
6364
}
6465
p.l.Init()
6566
p.currentNumberOfConn = p.l.Len()
67+
p.unusableNumberOfConn = 0
6668
p.cond.Broadcast()
6769
}
6870

@@ -137,6 +139,7 @@ func (p *Pool) connect(timeout time.Duration) (err error) {
137139
if err != nil {
138140
return err
139141
}
142+
conn.sentinel = p.sentinel
140143
p.l.PushBack(conn)
141144
}
142145
return nil
@@ -154,10 +157,24 @@ func (p *Pool) pushBack(conn *Conn) {
154157
p.cond.Signal()
155158
p.mutex.Unlock()
156159
break
160+
} else if p.sentinel {
161+
// Give up this conn
162+
conn.Close()
163+
break
157164
} else if !markedUnusable {
158165
markedUnusable = true
159166
p.unusableNumberOfConn++
160167
}
161168
time.Sleep(2 * time.Second)
162169
}
163170
}
171+
172+
func (p *Pool) sentinelGonnaGiveYouUp() {
173+
for p.connect(time.Duration(Config.ConnectTimeout)*time.Second) != nil {
174+
}
175+
p.closed = false
176+
}
177+
178+
func (p *Pool) sentinelGonnaLetYouDown() {
179+
p.Close()
180+
}

pubsub.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type Subscriptions struct {
3535
lock sync.Mutex
3636
ready bool
3737
readyChannel chan bool
38+
// Sentinel will set this to true to handle read error from sentinel server.
39+
throwError bool
3840
}
3941

4042
// NewSubscriptions returns new Subscriptions
@@ -46,6 +48,7 @@ func NewSubscriptions(conn *Conn) *Subscriptions {
4648
messageChannel: make(chan *Message, 100),
4749
ready: true,
4850
readyChannel: make(chan bool, 1),
51+
throwError: false,
4952
}
5053
go s.receive()
5154
return s
@@ -114,7 +117,12 @@ func (s *Subscriptions) receive() {
114117
for {
115118
rep, err := readReply(s.conn)
116119
if err != nil {
120+
if s.throwError {
121+
s.messageChannel <- nil
122+
return
123+
}
117124
s.ready = false
125+
go s.resubscribe()
118126
break
119127
}
120128
if !rep.IsArray() {
@@ -155,10 +163,7 @@ func (s *Subscriptions) do(command string, channel ...string) error {
155163
s.lock.Lock()
156164
defer s.lock.Unlock()
157165
err := NewCommand(command, s.makeArgs(channel...)...).Send(s.conn)
158-
if err != nil {
159-
s.ready = false
160-
go s.resubscribe()
161-
} else {
166+
if err == nil {
162167
switch {
163168
case command[0] == 'S':
164169
for _, ch := range channel {
@@ -211,8 +216,9 @@ func (s *Subscriptions) resubscribe() {
211216
}
212217
err = NewCommand("PSUBSCRIBE", s.makeArgs(pchannels...)...).Send(s.conn)
213218
if err != nil {
214-
continue
215-
}
219+
continue
220+
}
221+
break
216222
}
217223
s.ready = true
218224
s.readyChannel <- true

reply.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,34 @@ func (r *Reply) Slice(s interface{}) error {
254254
return nil
255255
}
256256

257+
// Map converts the reply into a map[string]string.
258+
// It will return error unless the reply is an array reply from HGETALL,
259+
// or SENTINEL master
260+
func (r *Reply) Map() (map[string]string, error) {
261+
if r.IsNil() {
262+
return nil, ErrNil
263+
}
264+
if !r.IsArray() {
265+
return nil, ErrType
266+
}
267+
if len(r.arrayValue)%2 != 0 {
268+
return nil, ErrType
269+
}
270+
m := make(map[string]string)
271+
for i := 0; i < len(r.arrayValue)/2; i++ {
272+
first, err := r.arrayValue[2*i].String()
273+
if err != nil {
274+
continue
275+
}
276+
second, err := r.arrayValue[2*i+1].String()
277+
if err != nil && err != ErrNil {
278+
continue
279+
}
280+
m[first] = second
281+
}
282+
return m, nil
283+
}
284+
257285
// Error returns error message
258286
func (r *Reply) Error() (string, error) {
259287
if r.Type() != ReplyError {
@@ -301,10 +329,10 @@ func (r *Reply) IsError() bool {
301329
func Receive(conn *Conn) (r *Reply, err error) {
302330
conn.Lock()
303331
defer func() {
304-
conn.Unlock()
305-
if err != nil {
306-
conn.fail()
307-
}
332+
conn.Unlock()
333+
if err != nil {
334+
conn.fail()
335+
}
308336
}()
309337
if conn.RequestTimeout != 0 {
310338
conn.tcpConn.SetReadDeadline(time.Now().Add(conn.RequestTimeout))

0 commit comments

Comments
 (0)