Skip to content

Commit 6db448d

Browse files
committed
Merge branch 'GoRedisDev'
2 parents e930741 + 06e8023 commit 6db448d

File tree

11 files changed

+121
-37
lines changed

11 files changed

+121
-37
lines changed

.gitignore

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
2-
31
.DS_Store
42

53
main/goredis-server

doc/release notes.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
11
GoRedis release notes
22
=====================
33

4+
**SlaveOf 1.0.4** @2014.4.11
5+
6+
* [Feature] 支持配置dbpath
7+
48
**Proxy 1.0.4** @2014.3.30
59

610
* [Feature] 实现基本功能
711

12+
13+
**GoRedis 1.0.69** @2014.4.10
14+
* [Fix] 修正PrefixEnumerate时全部遍历没有quit
15+
* [ADD] 启动参数新增logpath、datapath用于自定义数据和Log路径
16+
17+
818
**GoRedis 1.0.68** @2014.3.30
919

1020
* [Fix] 修正SADD返回值

goredis/session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
// Session继承了net.Conn,代表一个客户端会话
1616
// 提供各种标准的Reply方法, Status/Error/Integer/Bulk/MultiBulks
1717
// cmd, err := session.ReadCommand()
18-
// session.ReplyReply(StatusReply("OK"))
18+
// session.WriteReply(StatusReply("OK"))
1919
// 协议参考:http://redis.io/topics/protocol
2020
type Session struct {
2121
net.Conn

goredis_proxy/goredis_proxy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (server *GoRedisProxy) On(session *Session, cmd *Command) (reply *Reply) {
8888
// 写入主库
8989
if server.master.Available() {
9090
if server.options.CanWrite() {
91-
reply, err = server.master.Invoke(session, cmd)
91+
reply, err = server.master.Send(session, cmd)
9292
session.SetAttribute(S_LAST_WRITE_KEY, key)
9393
} else {
9494
err = errors.New("reject write")
@@ -107,7 +107,7 @@ func (server *GoRedisProxy) On(session *Session, cmd *Command) (reply *Reply) {
107107
remotes[0], remotes[1] = remotes[1], remotes[0]
108108
}
109109
for _, remote := range remotes {
110-
reply, err = remote.Invoke(session, cmd)
110+
reply, err = remote.Send(session, cmd)
111111
session.SetAttribute(S_LAST_WRITE_KEY, "")
112112
if err == nil {
113113
break
@@ -132,6 +132,7 @@ func (server *GoRedisProxy) invokeCommand(session *Session, cmd *Command) (reply
132132
case "PING":
133133
return StatusReply("PONG")
134134
}
135+
// 禁止执行某些危险指令
135136
if ignoreSync[cmdName] {
136137
return ErrorReply("not support")
137138
}

goredis_proxy/remote_session.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,29 @@ func (s *RemoteSession) secondTicker() {
6666
}
6767

6868
// 发送指令到远程Redis,并返回结果
69-
func (s *RemoteSession) Invoke(session *Session, cmd *Command) (reply *Reply, err error) {
69+
func (s *RemoteSession) Send(session *Session, cmd *Command) (reply *Reply, err error) {
7070
if !s.available {
7171
err = errors.New("unavailable")
7272
return
7373
}
7474
i := s.indexOf([]byte(session.RemoteAddr().String()))
75-
s.counters.Get("total").Incr(1)
7675
// lock
7776
s.mus[i].Lock()
7877
defer s.mus[i].Unlock()
78+
79+
s.counters.Get("total").Incr(1)
7980
// redirect
8081
err = s.sessions[i].WriteCommand(cmd)
8182
if err == nil {
8283
reply, err = s.sessions[i].ReadReply()
8384
}
8485
if err != nil {
85-
s.available = false
86+
s.counters.Get("error").Incr(1)
87+
s.sessions[i].Close()
88+
s.sessions[i], err = s.createSession() // reconnect
89+
if err != nil {
90+
s.available = false
91+
}
8692
}
8793
return
8894
}
@@ -113,7 +119,7 @@ func (s *RemoteSession) Close() {
113119

114120
func (s *RemoteSession) createSession() (session *Session, err error) {
115121
var conn net.Conn
116-
conn, err = net.Dial("tcp", s.host)
122+
conn, err = net.DialTimeout("tcp", s.host, time.Millisecond*1000)
117123
if err != nil {
118124
return
119125
}

goredis_server/go_redis_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
)
1818

1919
// 版本号,每次更新都需要增加
20-
const VERSION = "1.0.68"
20+
const VERSION = "1.0.69"
2121
const PREFIX = "__goredis:"
2222

2323
var (

libs/levelredis/level_redis.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,12 +376,22 @@ func (l *LevelRedis) Keys(prefix []byte, fn func(i int, key, keytype []byte, qui
376376
// 前缀扫描
377377
func (l *LevelRedis) PrefixEnumerate(prefix []byte, direction IterDirection, fn func(i int, key, value []byte, quit *bool)) {
378378
min := prefix
379+
/**
380+
*
381+
*/
379382
max := append(prefix, MAXBYTE)
380383
j := -1
381384
l.RangeEnumerate(min, max, direction, func(i int, key, value []byte, quit *bool) {
382385
if bytes.HasPrefix(key, prefix) {
383386
j++
384387
fn(j, key, value, quit)
388+
} else {
389+
/**
390+
* 根据leveldb 的 key有序,因此具有相同前缀的key必定是在一起的
391+
* 所以一旦碰见了没有该前缀的key那么就直接退出,结束遍历
392+
*/
393+
394+
*quit = true
385395
}
386396
})
387397
return

main/goredis-server.go

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ func main() {
2828
slaveof := flag.String("slaveof", "", "replication")
2929
procs := flag.Int("procs", 8, "GOMAXPROCS")
3030
repair := flag.Bool("repair", false, "repaire rocksdb")
31-
logpath := flag.String("logpath", "/home/data", "config goredis log path and synclog path")
32-
31+
datapath := flag.String("datapath", "/data", "config goredis data path default path [/data/goredis_${port}/]")
32+
logpath := flag.String("logpath", "/home/logs/", "config goredis log path and synclog path ,default path [/home/logs/goredis_${port}]")
3333
flag.Parse()
3434

3535
if *version {
@@ -41,9 +41,9 @@ func main() {
4141

4242
opt := goredis_server.NewOptions()
4343
opt.SetBind(fmt.Sprintf("%s:%d", *host, *port))
44-
45-
dbhome := dbHome(*port)
44+
dbhome := dbHome(*datapath, *port)
4645
opt.SetDirectory(dbhome)
46+
4747
if len(*slaveof) > 0 {
4848
h, p, e := splitHostPort(*slaveof)
4949
if e != nil {
@@ -53,7 +53,9 @@ func main() {
5353
}
5454

5555
// 重定向日志输出位置
56-
opt.SetLogDir(redirectLogOutput(*logpath, *port, dbhome))
56+
logdir := redirectLogOutput(*logpath, *port)
57+
opt.SetLogDir(logdir)
58+
stdlog.Println("logdir:[" + logdir + "]\tdbhome:[" + dbhome + "]")
5759

5860
// repair
5961
if *repair {
@@ -88,35 +90,42 @@ func init() {
8890
})
8991
}
9092

91-
// 主路径,默认在/data下创建,否则在/tmp下
92-
func dbHome(port int) string {
93-
dbhome := "/data"
94-
finfo, e1 := os.Stat(dbhome)
95-
if os.IsNotExist(e1) || !finfo.IsDir() {
96-
dbhome = "/tmp"
93+
/**
94+
* 构建dbhome,使用datapath中的路径
95+
*/
96+
func dbHome(datapath string, port int) string {
97+
98+
dbhome := fmt.Sprintf("%s/goredis_%d/", datapath, port)
99+
100+
finfo, err := os.Stat(dbhome)
101+
if os.IsNotExist(err) || !finfo.IsDir() {
102+
os.MkdirAll(dbhome, os.ModePerm)
97103
}
98-
directory := fmt.Sprintf("%s/goredis_%d/", dbhome, port)
99-
os.MkdirAll(directory, os.ModePerm)
100-
return directory
104+
return dbhome
101105
}
102106

103-
// 将Stdout, Stderr重定向到指定文件
104-
func redirectLogOutput(directory string, port int, defdir string) string {
107+
/**
108+
* 将Stdout, Stderr重定向到指定文件
109+
* 并返回当前日志路径
110+
*/
111+
func redirectLogOutput(directory string, port int) string {
105112

106-
logpath := directory
113+
oldout := os.Stdout
107114

108-
loginfo, err := os.Stat(directory)
109-
//如果没有就是用默认的即 dbhome
115+
logpath := fmt.Sprintf("%s/goredis_%d/", directory, port)
116+
117+
loginfo, err := os.Stat(logpath)
118+
119+
/**
120+
* 如果logpath不存在
121+
* 则创建
122+
*/
110123
if os.IsNotExist(err) || !loginfo.IsDir() {
111-
directory = defdir
124+
os.MkdirAll(logpath, os.ModePerm)
112125
}
113126

114-
logpath = fmt.Sprintf("%s/logs/%d/", logpath, port)
115-
os.MkdirAll(logpath, os.ModePerm)
127+
os.Stdout, err = os.OpenFile(logpath+"stdout.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
116128

117-
oldout := os.Stdout
118-
119-
os.Stdout, err = os.OpenFile(logpath+"/stdout.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, os.ModePerm)
120129
if err != nil {
121130
panic(err)
122131
}
@@ -127,6 +136,7 @@ func redirectLogOutput(directory string, port int, defdir string) string {
127136
if err != nil {
128137
panic(err)
129138
}
139+
130140
return logpath
131141
}
132142

main/slaveof-proxy.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func main() {
1717
pullrate := flag.Int("pullrate", 400, "pull rate in Mbits/s")
1818
pushrate := flag.Int("pushrate", 400, "push rate in Mbits/s")
1919
buffer := flag.Int("buffer", 100, "buffer x10000 records")
20+
dbpath := flag.String("dbpath", "/tmp", "rdb path")
2021
flag.Parse()
2122

2223
if *pullrate < 100 {
@@ -31,7 +32,12 @@ func main() {
3132
*buffer = 1000
3233
}
3334

34-
stdlog.Println("slaveof-proxy 1.0.3")
35+
stdlog.Println("slaveof-proxy 1.0.4")
36+
if len(*src) == 0 || len(*dest) == 0 {
37+
stdlog.Println("Usage: ./slaveof-proxy -src master:port -dest slave:6379 -pullrate 400 -pushrate 400 -buffer 100 -dbpath /tmp")
38+
return
39+
}
40+
3541
stdlog.Printf("from [%s] to [%s]\n", *src, *dest)
3642
stdlog.Printf("pull [%d] buffer [%d]\n", *pullrate, *buffer)
3743
stdlog.Println("SYNC ...")
@@ -43,7 +49,11 @@ func main() {
4349
}
4450
client.SetPullRate(*pullrate / 8 * 1024 * 1024)
4551
client.SetPushRate(*pushrate / 8 * 1024 * 1024)
46-
client.Sync()
52+
client.SetDbPath(*dbpath)
53+
err = client.Sync()
54+
if err != nil {
55+
panic(err)
56+
}
4757
}
4858

4959
func init() {

main/tool/slaveof/slave_client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func (s *SlaveClient) SetPushRate(n int) {
7777
s.pushrate = n
7878
}
7979

80+
func (s *SlaveClient) SetDbPath(dbpath string) {
81+
s.directory = dbpath
82+
}
83+
8084
func (s *SlaveClient) Writer() io.Writer {
8185
if s.destwriter == nil {
8286
s.destwriter = ratelimit.NewRateLimiter(s.Dest(), s.pushrate)

0 commit comments

Comments
 (0)