Skip to content

Commit 992c531

Browse files
Merge pull request #121 from Netflix/more-effective-pooling
More effective pooling
2 parents 1d5d897 + f3a0338 commit 992c531

File tree

10 files changed

+61
-48
lines changed

10 files changed

+61
-48
lines changed

client/.gitignore

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
./blast
2-
./fill
3-
./setget
4-
./setops
5-
./sizes
1+
/blast
2+
/fill
3+
/setget
4+
/setops
5+
/sizes

client/binprot/headers.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var bufPool = &sync.Pool{
7777

7878
var resPool = &sync.Pool{
7979
New: func() interface{} {
80-
return res{}
80+
return &res{}
8181
},
8282
}
8383

@@ -125,20 +125,20 @@ type res struct {
125125
CAS uint64
126126
}
127127

128-
func readRes(r io.Reader) (res, error) {
128+
func readRes(r io.Reader) (*res, error) {
129129
buf := bufPool.Get().([]byte)
130130

131131
if _, err := io.ReadAtLeast(r, buf, 24); err != nil {
132132
bufPool.Put(buf)
133-
return res{}, err
133+
return nil, err
134134
}
135135

136136
if buf[0] != 0x81 {
137137
bufPool.Put(buf)
138-
return res{}, errors.New("Bad Magic")
138+
return nil, errors.New("Bad Magic")
139139
}
140140

141-
res := resPool.Get().(res)
141+
res := resPool.Get().(*res)
142142
res.Magic = buf[0]
143143
res.Opcode = buf[1]
144144
res.KeyLen = uint16(buf[2])<<8 | uint16(buf[3])

handlers/memcached/chunked/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ var (
9696
MetricCmdPrependMissesTokenL2 = metrics.AddCounter("cmd_prepend_misses_token_l2", nil)
9797
)
9898

99-
func readResponseHeader(r *bufio.Reader) (binprot.ResponseHeader, error) {
99+
func readResponseHeader(r *bufio.Reader) (*binprot.ResponseHeader, error) {
100100
resHeader, err := binprot.ReadResponseHeader(r)
101101
if err != nil {
102-
return binprot.ResponseHeader{}, err
102+
return nil, err
103103
}
104104

105105
if err := binprot.DecodeError(resHeader); err != nil {

handlers/memcached/std/handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ import (
2323
"github.com/netflix/rend/protocol/binprot"
2424
)
2525

26-
func readResponseHeader(r *bufio.Reader) (binprot.ResponseHeader, error) {
26+
func readResponseHeader(r *bufio.Reader) (*binprot.ResponseHeader, error) {
2727
resHeader, err := binprot.ReadResponseHeader(r)
2828
if err != nil {
29-
return binprot.ResponseHeader{}, err
29+
return nil, err
3030
}
3131

3232
if err := binprot.DecodeError(resHeader); err != nil {

orcas/locked.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func LockedWithExisting(oc OrcaConst, locksetID uint32) OrcaConst {
125125

126126
func (l *LockedOrca) getlock(key []byte, read bool) sync.Locker {
127127
h := l.hpool.Get().(hash.Hash32)
128+
defer l.hpool.Put(h)
128129
h.Reset()
129130

130131
// Calculate bucket using hash and mod. hash.Hash.Write() never returns an error.

protocol/binprot/headers.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package binprot
1616

1717
import (
1818
"encoding/binary"
19+
"fmt"
1920
"io"
2021
"sync"
2122

@@ -58,8 +59,8 @@ type ResponseHeader struct {
5859
CASToken uint64
5960
}
6061

61-
func makeRequestHeader(opcode uint8, keyLength, extraLength, totalBodyLength int, opaque uint32) RequestHeader {
62-
rh := reqHeadPool.Get().(RequestHeader)
62+
func makeRequestHeader(opcode uint8, keyLength, extraLength, totalBodyLength int, opaque uint32) *RequestHeader {
63+
rh := reqHeadPool.Get().(*RequestHeader)
6364
rh.Magic = MagicRequest
6465
rh.Opcode = opcode
6566
rh.KeyLength = uint16(keyLength)
@@ -81,42 +82,40 @@ var bufPool = &sync.Pool{
8182

8283
var resHeadPool = &sync.Pool{
8384
New: func() interface{} {
84-
return ResponseHeader{}
85+
return new(ResponseHeader)
8586
},
8687
}
8788

88-
func PutResponseHeader(rh ResponseHeader) {
89+
// PutResponseHeader returns the response header to the pool
90+
func PutResponseHeader(rh *ResponseHeader) {
8991
resHeadPool.Put(rh)
9092
}
9193

9294
var reqHeadPool = &sync.Pool{
9395
New: func() interface{} {
94-
return RequestHeader{}
96+
return new(RequestHeader)
9597
},
9698
}
9799

98-
var (
99-
emptyResHeader = ResponseHeader{}
100-
emptyReqHeader = RequestHeader{}
101-
)
102-
103-
func readRequestHeader(r io.Reader) (RequestHeader, error) {
100+
func readRequestHeader(r io.Reader) (*RequestHeader, error) {
104101
buf := bufPool.Get().([]byte)
105102

106103
br, err := io.ReadAtLeast(r, buf, ReqHeaderLen)
107104
metrics.IncCounterBy(common.MetricBytesReadRemote, uint64(br))
108105
if err != nil {
109106
bufPool.Put(buf)
110-
return emptyReqHeader, err
107+
return nil, err
111108
}
112109

113110
if buf[0] != MagicRequest {
111+
fmt.Printf("%#v\n", buf)
114112
bufPool.Put(buf)
115113
metrics.IncCounter(MetricBinaryRequestHeadersBadMagic)
116-
return emptyReqHeader, ErrBadMagic
114+
return nil, ErrBadMagic
117115
}
118116

119-
rh := reqHeadPool.Get().(RequestHeader)
117+
rh := reqHeadPool.Get().(*RequestHeader)
118+
120119
rh.Magic = buf[0]
121120
rh.Opcode = buf[1]
122121
rh.KeyLength = binary.BigEndian.Uint16(buf[2:4])
@@ -139,7 +138,7 @@ func readRequestHeader(r io.Reader) (RequestHeader, error) {
139138
return rh, nil
140139
}
141140

142-
func writeRequestHeader(w io.Writer, rh RequestHeader) error {
141+
func writeRequestHeader(w io.Writer, rh *RequestHeader) error {
143142
buf := bufPool.Get().([]byte)
144143

145144
buf[0] = rh.Magic
@@ -164,23 +163,23 @@ func writeRequestHeader(w io.Writer, rh RequestHeader) error {
164163
return err
165164
}
166165

167-
func ReadResponseHeader(r io.Reader) (ResponseHeader, error) {
166+
func ReadResponseHeader(r io.Reader) (*ResponseHeader, error) {
168167
buf := bufPool.Get().([]byte)
169168

170169
br, err := io.ReadAtLeast(r, buf, resHeaderLen)
171170
metrics.IncCounterBy(common.MetricBytesReadRemote, uint64(br))
172171
if err != nil {
173172
bufPool.Put(buf)
174-
return emptyResHeader, err
173+
return nil, err
175174
}
176175

177176
if buf[0] != MagicResponse {
178177
bufPool.Put(buf)
179178
metrics.IncCounter(MetricBinaryResponseHeadersBadMagic)
180-
return emptyResHeader, ErrBadMagic
179+
return nil, ErrBadMagic
181180
}
182181

183-
rh := resHeadPool.Get().(ResponseHeader)
182+
rh := resHeadPool.Get().(*ResponseHeader)
184183
rh.Magic = buf[0]
185184
rh.Opcode = buf[1]
186185
rh.KeyLength = binary.BigEndian.Uint16(buf[2:4])
@@ -201,7 +200,7 @@ func ReadResponseHeader(r io.Reader) (ResponseHeader, error) {
201200
return rh, nil
202201
}
203202

204-
func writeResponseHeader(w io.Writer, rh ResponseHeader) error {
203+
func writeResponseHeader(w io.Writer, rh *ResponseHeader) error {
205204
buf := bufPool.Get().([]byte)
206205

207206
buf[0] = rh.Magic

protocol/binprot/parser.go

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,13 @@ func (b BinaryParser) Parse() (common.Request, common.RequestType, uint64, error
129129
// read in the full header before any variable length fields
130130
reqHeader, err := readRequestHeader(b.reader)
131131
start := timer.Now()
132-
defer reqHeadPool.Put(reqHeader)
133132

134133
if err != nil {
135134
return nil, common.RequestUnknown, start, err
136135
}
137136

137+
defer reqHeadPool.Put(reqHeader)
138+
138139
switch reqHeader.Opcode {
139140
case OpcodeSet:
140141
return setRequest(b.reader, reqHeader, common.RequestSet, false, start)
@@ -291,13 +292,15 @@ func (b BinaryParser) Parse() (common.Request, common.RequestType, uint64, error
291292
return nil, common.RequestUnknown, start, common.ErrUnknownCmd
292293
}
293294

294-
func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error) {
295+
func readBatchGet(r io.Reader, header *RequestHeader) (common.GetRequest, error) {
295296
var keys [][]byte
296297
var opaques []uint32
297298
var quiet []bool
298299
var noopOpaque uint32
299300
var noopEnd bool
300301

302+
first := true
303+
301304
// while GETQ
302305
// read key, read header
303306
for header.Opcode == OpcodeGetQ {
@@ -312,7 +315,11 @@ func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error)
312315
quiet = append(quiet, true)
313316

314317
// read in the next header
315-
reqHeadPool.Put(header)
318+
if !first {
319+
reqHeadPool.Put(header)
320+
} else {
321+
first = false
322+
}
316323
header, err = readRequestHeader(r)
317324
if err != nil {
318325
return common.GetRequest{}, err
@@ -354,13 +361,15 @@ func readBatchGet(r io.Reader, header RequestHeader) (common.GetRequest, error)
354361
}, nil
355362
}
356363

357-
func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error) {
364+
func readBatchGetE(r io.Reader, header *RequestHeader) (common.GetRequest, error) {
358365
var keys [][]byte
359366
var opaques []uint32
360367
var quiet []bool
361368
var noopOpaque uint32
362369
var noopEnd bool
363370

371+
first := true
372+
364373
// while GETQ
365374
// read key, read header
366375
for header.Opcode == OpcodeGetEQ {
@@ -375,7 +384,11 @@ func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error)
375384
quiet = append(quiet, true)
376385

377386
// read in the next header
378-
reqHeadPool.Put(header)
387+
if !first {
388+
reqHeadPool.Put(header)
389+
} else {
390+
first = false
391+
}
379392
header, err = readRequestHeader(r)
380393
if err != nil {
381394
return common.GetRequest{}, err
@@ -417,7 +430,7 @@ func readBatchGetE(r io.Reader, header RequestHeader) (common.GetRequest, error)
417430
}, nil
418431
}
419432

420-
func setRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
433+
func setRequest(r io.Reader, reqHeader *RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
421434
// flags, exptime, key, value
422435
flags, err := readUInt32(r)
423436
if err != nil {
@@ -459,7 +472,7 @@ func setRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType
459472
}, reqType, start, nil
460473
}
461474

462-
func appendPrependRequest(r io.Reader, reqHeader RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
475+
func appendPrependRequest(r io.Reader, reqHeader *RequestHeader, reqType common.RequestType, quiet bool, start uint64) (common.SetRequest, common.RequestType, uint64, error) {
463476
// key, value
464477
key, err := readString(r, reqHeader.KeyLength)
465478
if err != nil {

protocol/binprot/parser_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ func (d dummyIO) Write(p []byte) (int, error) {
5959
}
6060

6161
var (
62-
reqHeaderBenchmarkSink RequestHeader
63-
resHeaderBenchmarkSink ResponseHeader
62+
reqHeaderBenchmarkSink *RequestHeader
63+
resHeaderBenchmarkSink *ResponseHeader
6464
errBenchmarkSink error
6565
)
6666

@@ -71,7 +71,7 @@ func BenchmarkHeaders(b *testing.B) {
7171
}
7272
})
7373
b.Run("writeRequestHeader", func(b *testing.B) {
74-
temp := RequestHeader{}
74+
temp := &RequestHeader{}
7575
for i := 0; i < b.N; i++ {
7676
errBenchmarkSink = writeRequestHeader(dummyIO{}, temp)
7777
}
@@ -82,7 +82,7 @@ func BenchmarkHeaders(b *testing.B) {
8282
}
8383
})
8484
b.Run("writeResponseHeader", func(b *testing.B) {
85-
temp := ResponseHeader{}
85+
temp := &ResponseHeader{}
8686
for i := 0; i < b.N; i++ {
8787
errBenchmarkSink = writeResponseHeader(dummyIO{}, temp)
8888
}

protocol/binprot/respond.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ func getCommon(w *bufio.Writer, response common.GetResponse, opcode uint8) error
308308
func writeSuccessResponseHeader(w *bufio.Writer, opcode uint8, keyLength, extraLength,
309309
totalBodyLength int, opaque uint32, flush bool) error {
310310

311-
header := resHeadPool.Get().(ResponseHeader)
311+
header := resHeadPool.Get().(*ResponseHeader)
312312

313313
header.Magic = MagicResponse
314314
header.Opcode = opcode
@@ -339,7 +339,7 @@ func writeSuccessResponseHeader(w *bufio.Writer, opcode uint8, keyLength, extraL
339339
}
340340

341341
func writeErrorResponseHeader(w *bufio.Writer, opcode uint8, status uint16, opaque uint32) error {
342-
header := resHeadPool.Get().(ResponseHeader)
342+
header := resHeadPool.Get().(*ResponseHeader)
343343

344344
header.Magic = MagicResponse
345345
header.Opcode = opcode

protocol/binprot/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ const (
8383
StatusInvalid = uint16(0xFFFF)
8484
)
8585

86-
func DecodeError(header ResponseHeader) error {
86+
func DecodeError(header *ResponseHeader) error {
8787
switch header.Status {
8888
case StatusKeyEnoent:
8989
return common.ErrKeyNotFound

0 commit comments

Comments
 (0)