11package reader
22
33import (
4- "RedisShake/internal/client/proto"
54 "bufio"
65 "bytes"
76 "context"
7+ "encoding/json"
88 "fmt"
99 "io"
1010 "os"
@@ -14,6 +14,8 @@ import (
1414 "strings"
1515 "time"
1616
17+ "RedisShake/internal/client/proto"
18+
1719 "RedisShake/internal/client"
1820 "RedisShake/internal/config"
1921 "RedisShake/internal/entry"
@@ -38,6 +40,8 @@ type SyncReaderOptions struct {
3840 Sentinel client.SentinelOptions `mapstructure:"sentinel"`
3941}
4042
43+ const RDB_EOF_MARKER_LEN = 40
44+
4145type State string
4246
4347const (
@@ -48,6 +52,47 @@ const (
4852 kSyncAof State = "syncing aof"
4953)
5054
55+ type syncStandaloneReaderStat struct {
56+ Name string `json:"name"`
57+ Address string `json:"address"`
58+ Dir string `json:"dir"`
59+
60+ // status
61+ Status State `json:"status"`
62+
63+ // rdb info
64+ RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
65+ RdbFileSizeHuman string `json:"rdb_file_size_human"`
66+ RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
67+ RdbReceivedHuman string `json:"rdb_received_human"`
68+ RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
69+ RdbSentHuman string `json:"rdb_sent_human"`
70+
71+ // aof info
72+ AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
73+ AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
74+ AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master
75+ AofReceivedHuman string `json:"aof_received_human"`
76+ }
77+
78+ func (s syncStandaloneReaderStat ) MarshalJSON () ([]byte , error ) {
79+ if s .RdbFileSizeBytes != 0 {
80+ s .RdbFileSizeHuman = humanize .IBytes (s .RdbFileSizeBytes )
81+ }
82+ if s .RdbReceivedBytes != 0 {
83+ s .RdbReceivedHuman = humanize .IBytes (s .RdbReceivedBytes )
84+ }
85+ if s .RdbSentBytes != 0 {
86+ s .RdbSentHuman = humanize .IBytes (s .RdbSentBytes )
87+ }
88+ if s .AofReceivedBytes != 0 {
89+ s .AofReceivedHuman = humanize .IBytes (s .AofReceivedBytes )
90+ }
91+
92+ type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion
93+ return json .Marshal (aliasStat (s ))
94+ }
95+
5196type syncStandaloneReader struct {
5297 ctx context.Context
5398 opts * SyncReaderOptions
@@ -56,28 +101,7 @@ type syncStandaloneReader struct {
56101 ch chan * entry.Entry
57102 DbId int
58103
59- stat struct {
60- Name string `json:"name"`
61- Address string `json:"address"`
62- Dir string `json:"dir"`
63-
64- // status
65- Status State `json:"status"`
66-
67- // rdb info
68- RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
69- RdbFileSizeHuman string `json:"rdb_file_size_human"`
70- RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
71- RdbReceivedHuman string `json:"rdb_received_human"`
72- RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
73- RdbSentHuman string `json:"rdb_sent_human"`
74-
75- // aof info
76- AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
77- AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
78- AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
79- AofReceivedHuman string `json:"aof_received_human"`
80- }
104+ stat syncStandaloneReaderStat
81105
82106 // version info
83107 isDiskless bool
@@ -319,7 +343,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
319343 }
320344 timeStart = time .Now ()
321345 log .Debugf ("[%s] start receiving RDB. path=[%s]" , r .stat .Name , rdbFilePath )
322- rdbFileHandle , err := os .OpenFile (rdbFilePath , os .O_WRONLY | os .O_CREATE | os .O_TRUNC , 0666 )
346+ rdbFileHandle , err := os .OpenFile (rdbFilePath , os .O_WRONLY | os .O_CREATE | os .O_TRUNC , 0o666 )
323347 if err != nil {
324348 log .Panicf (err .Error ())
325349 }
@@ -345,39 +369,44 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
345369 buf := make ([]byte , bufSize )
346370
347371 marker = strings .Split (marker , ":" )[1 ]
348- if len (marker ) != 40 {
372+ if len (marker ) != RDB_EOF_MARKER_LEN {
349373 log .Panicf ("[%s] invalid len of EOF marker. value=[%s]" , r .stat .Name , marker )
350374 }
351375 log .Infof ("meet EOF begin marker: %s" , marker )
352376 bMarker := []byte (marker )
353- goon := true
354- for goon {
355- n , err := r .client .Read (buf [:bufSize ])
377+ var lastBytes []byte
378+ for {
379+ copy (buf , lastBytes ) // copy previous tail bytes to head of buf
380+
381+ nread , err := r .client .Read (buf [len (lastBytes ):])
356382 if err != nil {
357383 log .Panicf (err .Error ())
358384 }
359- buffer := buf [:n ]
360- if bytes .Contains (buffer , bMarker ) {
385+
386+ bufLen := len (lastBytes ) + nread
387+ nwrite := 0
388+ if bufLen >= RDB_EOF_MARKER_LEN && bytes .Equal (buf [bufLen - RDB_EOF_MARKER_LEN :bufLen ], bMarker ) {
361389 log .Infof ("meet EOF end marker." )
362- // replace it
363- fi := bytes .Index (buffer , bMarker )
364- if len (buffer [fi + 40 :]) > 0 {
365- log .Warnf ("data after end marker will be discarded: %s" , string (buffer [fi + 40 :]))
390+ // Write all buf without EOF marker and break
391+ if nwrite , err = wt .Write (buf [:bufLen - RDB_EOF_MARKER_LEN ]); err != nil {
392+ log .Panicf (err .Error ())
366393 }
367- buffer = buffer [:fi ]
368-
369- goon = false
394+ break
370395 }
371396
372- _ , err = wt .Write (buffer )
373- if err != nil {
374- log .Panicf (err .Error ())
397+ if bufLen >= RDB_EOF_MARKER_LEN {
398+ // left RDB_EOF_MARKER_LEN bytes to next round
399+ if nwrite , err = wt .Write (buf [:bufLen - RDB_EOF_MARKER_LEN ]); err != nil {
400+ log .Panicf (err .Error ())
401+ }
402+ lastBytes = buf [bufLen - RDB_EOF_MARKER_LEN : bufLen ] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round
403+ } else {
404+ // save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN
405+ lastBytes = buf [:bufLen ]
375406 }
376407
377- r .stat .RdbFileSizeBytes += int64 (n )
378- r .stat .RdbFileSizeHuman = humanize .IBytes (uint64 (r .stat .RdbFileSizeBytes ))
379- r .stat .RdbReceivedBytes += int64 (n )
380- r .stat .RdbReceivedHuman = humanize .IBytes (uint64 (r .stat .RdbReceivedBytes ))
408+ r .stat .RdbFileSizeBytes += uint64 (nwrite )
409+ r .stat .RdbReceivedBytes += uint64 (nwrite )
381410 }
382411}
383412
@@ -387,8 +416,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
387416 log .Panicf (err .Error ())
388417 }
389418 log .Debugf ("[%s] rdb file size: [%v]" , r .stat .Name , humanize .IBytes (uint64 (length )))
390- r .stat .RdbFileSizeBytes = length
391- r .stat .RdbFileSizeHuman = humanize .IBytes (uint64 (length ))
419+ r .stat .RdbFileSizeBytes = uint64 (length )
392420
393421 remainder := length
394422 const bufSize int64 = 32 * 1024 * 1024 // 32MB
@@ -408,8 +436,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
408436 log .Panicf (err .Error ())
409437 }
410438
411- r .stat .RdbReceivedBytes += int64 (n )
412- r .stat .RdbReceivedHuman = humanize .IBytes (uint64 (r .stat .RdbReceivedBytes ))
439+ r .stat .RdbReceivedBytes += uint64 (n )
413440 }
414441}
415442
@@ -427,8 +454,7 @@ func (r *syncStandaloneReader) receiveAOF() {
427454 if err != nil {
428455 log .Panicf (err .Error ())
429456 }
430- r .stat .AofReceivedBytes += int64 (n )
431- r .stat .AofReceivedHuman = humanize .IBytes (uint64 (r .stat .AofReceivedBytes ))
457+ r .stat .AofReceivedBytes += uint64 (n )
432458 aofWriter .Write (buf [:n ])
433459 r .stat .AofReceivedOffset += int64 (n )
434460 }
@@ -440,8 +466,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
440466 log .Debugf ("[%s] start sending RDB to target" , r .stat .Name )
441467 r .stat .Status = kSyncRdb
442468 updateFunc := func (offset int64 ) {
443- r .stat .RdbSentBytes = offset
444- r .stat .RdbSentHuman = humanize .IBytes (uint64 (offset ))
469+ r .stat .RdbSentBytes = uint64 (offset )
445470 }
446471 rdbLoader := rdb .NewLoader (r .stat .Name , updateFunc , rdbFilePath , r .ch )
447472 r .DbId = rdbLoader .ParseRDB (r .ctx )
@@ -532,16 +557,16 @@ func (r *syncStandaloneReader) Status() interface{} {
532557
533558func (r * syncStandaloneReader ) StatusString () string {
534559 if r .stat .Status == kSyncRdb {
535- return fmt .Sprintf ("%s, size=[%s/%s]" , r .stat .Status , r .stat .RdbSentHuman , r .stat .RdbFileSizeHuman )
560+ return fmt .Sprintf ("%s, size=[%s/%s]" , r .stat .Status , humanize . IBytes ( r .stat .RdbSentBytes ), humanize . IBytes ( r .stat .RdbFileSizeBytes ) )
536561 }
537562 if r .stat .Status == kSyncAof {
538563 return fmt .Sprintf ("%s, diff=[%v]" , r .stat .Status , - r .stat .AofSentOffset + r .stat .AofReceivedOffset )
539564 }
540565 if r .stat .Status == kReceiveRdb {
541566 if r .isDiskless {
542- return fmt .Sprintf ("%s diskless, size=[%s]" , r .stat .Status , r .stat .RdbReceivedHuman )
567+ return fmt .Sprintf ("%s diskless, size=[%s]" , r .stat .Status , humanize . IBytes ( r .stat .RdbReceivedBytes ) )
543568 }
544- return fmt .Sprintf ("%s, size=[%s/%s]" , r .stat .Status , r .stat .RdbReceivedHuman , r .stat .RdbFileSizeHuman )
569+ return fmt .Sprintf ("%s, size=[%s/%s]" , r .stat .Status , humanize . IBytes ( r .stat .RdbReceivedBytes ), humanize . IBytes ( r .stat .RdbFileSizeBytes ) )
545570 }
546571 return string (r .stat .Status )
547572}
0 commit comments