Skip to content

Commit 73fad62

Browse files
author
Vladimir Smirnov
committed
Cache queries with globs
For each render and find requests, add a cache item that contains original query. This should be useful for queries with globs. Also make cache expire time configurable.
1 parent d104c59 commit 73fad62

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

main.go

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ var Config = struct {
5252
MaxIdleConnsPerHost int
5353

5454
ConcurrencyLimitPerServer int
55+
ExpireDelaySec int32
5556
}{
5657
MaxProcs: 1,
5758
IntervalSec: 60,
@@ -63,6 +64,8 @@ var Config = struct {
6364

6465
MaxIdleConnsPerHost: 100,
6566

67+
ExpireDelaySec: 10 * 60, // 10 minutes
68+
6669
pathCache: pathCache{ec: expirecache.New(0)},
6770
}
6871

@@ -81,8 +84,10 @@ var Metrics = struct {
8184

8285
Timeouts *expvar.Int
8386

84-
CacheSize expvar.Func
85-
CacheItems expvar.Func
87+
CacheSize expvar.Func
88+
CacheItems expvar.Func
89+
CacheMisses *expvar.Int
90+
CacheHits *expvar.Int
8691
}{
8792
FindRequests: expvar.NewInt("find_requests"),
8893
FindErrors: expvar.NewInt("find_errors"),
@@ -96,6 +101,9 @@ var Metrics = struct {
96101
InfoErrors: expvar.NewInt("info_errors"),
97102

98103
Timeouts: expvar.NewInt("timeouts"),
104+
105+
CacheHits: expvar.NewInt("cache_hits"),
106+
CacheMisses: expvar.NewInt("cache_misses"),
99107
}
100108

101109
// BuildVersion is defined at build and reported at startup and as expvar
@@ -360,7 +368,10 @@ func findHandler(w http.ResponseWriter, req *http.Request) {
360368
var backends []string
361369
var ok bool
362370
if backends, ok = Config.pathCache.get(tld); !ok || backends == nil || len(backends) == 0 {
371+
Metrics.CacheMisses.Add(1)
363372
backends = Config.Backends
373+
} else {
374+
Metrics.CacheHits.Add(1)
364375
}
365376

366377
responses := multiGet(backends, rewrite.RequestURI())
@@ -375,9 +386,12 @@ func findHandler(w http.ResponseWriter, req *http.Request) {
375386
metrics = append(metrics, m...)
376387

377388
// update our cache of which servers have which metrics
389+
allServers := make([]string, 0)
378390
for k, v := range paths {
379391
Config.pathCache.set(k, v)
392+
allServers = append(allServers, v...)
380393
}
394+
Config.pathCache.set(originalQuery, allServers)
381395
}
382396

383397
encodeFindResponse(format, originalQuery, w, metrics)
@@ -467,7 +481,10 @@ func renderHandler(w http.ResponseWriter, req *http.Request) {
467481

468482
// lookup the server list for this metric, or use all the servers if it's unknown
469483
if serverList, ok = Config.pathCache.get(target); !ok || serverList == nil || len(serverList) == 0 {
484+
Metrics.CacheMisses.Add(1)
470485
serverList = Config.Backends
486+
} else {
487+
Metrics.CacheHits.Add(1)
471488
}
472489

473490
responses = append(responses, multiGet(serverList, rewrite.RequestURI())...)
@@ -477,7 +494,10 @@ func renderHandler(w http.ResponseWriter, req *http.Request) {
477494

478495
// lookup the server list for this metric, or use all the servers if it's unknown
479496
if serverList, ok = Config.pathCache.get(target); !ok || serverList == nil || len(serverList) == 0 {
497+
Metrics.CacheMisses.Add(1)
480498
serverList = Config.Backends
499+
} else {
500+
Metrics.CacheHits.Add(1)
481501
}
482502

483503
responses = multiGet(serverList, rewrite.RequestURI())
@@ -490,7 +510,7 @@ func renderHandler(w http.ResponseWriter, req *http.Request) {
490510
return
491511
}
492512

493-
metrics := mergeResponses(req, responses)
513+
servers, metrics := mergeResponses(req, responses)
494514
if metrics == nil {
495515
Metrics.RenderErrors.Add(1)
496516
err := fmt.Sprintf("no decoded responses to merge for req: %s", req.URL.RequestURI())
@@ -499,6 +519,8 @@ func renderHandler(w http.ResponseWriter, req *http.Request) {
499519
return
500520
}
501521

522+
Config.pathCache.set(target, servers)
523+
502524
switch format {
503525
case "protobuf3":
504526
w.Header().Set("Content-Type", contentTypeProtobuf)
@@ -567,8 +589,9 @@ func createRenderResponse(metrics *pb3.MultiFetchResponse, missing interface{})
567589
return response
568590
}
569591

570-
func mergeResponses(req *http.Request, responses []serverResponse) *pb3.MultiFetchResponse {
592+
func mergeResponses(req *http.Request, responses []serverResponse) ([]string, *pb3.MultiFetchResponse) {
571593

594+
servers := make([]string, 0, len(responses))
572595
metrics := make(map[string][]pb3.FetchResponse)
573596

574597
for _, r := range responses {
@@ -583,12 +606,13 @@ func mergeResponses(req *http.Request, responses []serverResponse) *pb3.MultiFet
583606
for _, m := range d.Metrics {
584607
metrics[m.GetName()] = append(metrics[m.GetName()], *m)
585608
}
609+
servers = append(servers, r.server)
586610
}
587611

588612
var multi pb3.MultiFetchResponse
589613

590614
if len(metrics) == 0 {
591-
return nil
615+
return servers, nil
592616
}
593617

594618
for name, decoded := range metrics {
@@ -617,7 +641,7 @@ func mergeResponses(req *http.Request, responses []serverResponse) *pb3.MultiFet
617641
multi.Metrics = append(multi.Metrics, &metric)
618642
}
619643

620-
return &multi
644+
return servers, &multi
621645
}
622646

623647
func mergeValues(req *http.Request, metric *pb3.FetchResponse, decoded []pb3.FetchResponse) {
@@ -698,7 +722,10 @@ func infoHandler(w http.ResponseWriter, req *http.Request) {
698722

699723
// lookup the server list for this metric, or use all the servers if it's unknown
700724
if serverList, ok = Config.pathCache.get(target); !ok || serverList == nil || len(serverList) == 0 {
725+
Metrics.CacheMisses.Add(1)
701726
serverList = Config.Backends
727+
} else {
728+
Metrics.CacheHits.Add(1)
702729
}
703730

704731
format := req.FormValue("format")
@@ -923,6 +950,8 @@ func main() {
923950

924951
graphite.Register(fmt.Sprintf("carbon.zipper.%s.cache_size", hostname), Metrics.CacheSize)
925952
graphite.Register(fmt.Sprintf("carbon.zipper.%s.cache_items", hostname), Metrics.CacheItems)
953+
graphite.Register(fmt.Sprintf("carbon.zipper.%s.cache_hits", hostname), Metrics.CacheHits)
954+
graphite.Register(fmt.Sprintf("carbon.zipper.%s.cache_misses", hostname), Metrics.CacheMisses)
926955

927956
go mstats.Start(*interval)
928957

@@ -1022,14 +1051,13 @@ type pathCache struct {
10221051
}
10231052

10241053
func (p *pathCache) set(k string, v []string) {
1025-
// expire cache entries after 10 minutes
1026-
const expireDelay = 60 * 10
1054+
// expire cache entries after Config.ExpireCache minutes
10271055
var size uint64
10281056
for _, vv := range v {
10291057
size += uint64(len(vv))
10301058
}
10311059

1032-
p.ec.Set(k, v, size, expireDelay)
1060+
p.ec.Set(k, v, size, Config.ExpireDelaySec)
10331061
}
10341062

10351063
func (p *pathCache) get(k string) ([]string, bool) {

0 commit comments

Comments
 (0)