Skip to content

Commit 0b85194

Browse files
committed
Moved daemon from cluster package to groupcache package
1 parent cbe1d98 commit 0b85194

File tree

9 files changed

+161
-161
lines changed

9 files changed

+161
-161
lines changed

README.md

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -83,28 +83,23 @@ import (
8383
"log/slog"
8484

8585
"github.com/groupcache/groupcache-go/v3"
86-
"github.com/groupcache/groupcache-go/v3/cluster"
87-
"github.com/groupcache/groupcache-go/v3/data"
88-
"github.com/segmentio/fasthash/fnv1"
86+
"github.com/groupcache/groupcache-go/v3/data"
87+
"github.com/groupcache/groupcache-go/v3/transport/peer"
8988
)
9089

9190
func ExampleUsage() {
9291
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
9392
defer cancel()
9493

95-
// Starts an instance of groupcache with the provided transport
96-
d, err := cluster.SpawnDaemon(ctx, "192.168.1.1:8080", groupcache.Options{
97-
// If transport is nil, defaults to transport.HttpTransport
98-
Transport: nil,
99-
HashFn: fnv1.HashBytes64,
100-
Logger: slog.Default(),
101-
Replicas: 50,
102-
})
94+
// SpawnDaemon is a convenience function which Starts an instance of groupcache
95+
// with the provided transport and listens for groupcache HTTP requests on the address provided.
96+
d, err := groupcache.SpawnDaemon(ctx, "192.168.1.1:8080", groupcache.Options{})
10397
if err != nil {
10498
log.Fatal("while starting server on 192.168.1.1:8080")
10599
}
106100

107-
// Manually set peers, or use some discovery system to identify peers
101+
// Manually set peers, or use some discovery system to identify peers.
102+
// It is safe to call SetPeers() whenever the peer topology changes
108103
d.SetPeers(ctx, []peer.Info{
109104
{
110105
Address: "192.168.1.1:8080",
@@ -121,8 +116,8 @@ func ExampleUsage() {
121116
})
122117

123118
// Create a new group cache with a max cache size of 3MB
124-
group := d.GroupCache.NewGroup("users", 3000000, groupcache.GetterFunc(
125-
func(ctx context.Context, id string, dest groupcache.Sink) error {
119+
group := d.NewGroup("users", 3000000, groupcache.GetterFunc(
120+
func(ctx context.Context, id string, dest data.Sink) error {
126121

127122
// Returns a protobuf struct `User`
128123
user, err := fetchUserFromMongo(ctx, id)
@@ -140,7 +135,7 @@ func ExampleUsage() {
140135
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
141136
defer cancel()
142137

143-
if err := group.Get(ctx, "12345", groupcache.ProtoSink(&user)); err != nil {
138+
if err := group.Get(ctx, "12345", data.ProtoSink(&user)); err != nil {
144139
log.Fatal(err)
145140
}
146141

@@ -155,7 +150,7 @@ func ExampleUsage() {
155150
log.Fatal(err)
156151
}
157152

158-
// Shutdown the daemon
153+
// Shutdown the instance and HTTP listeners
159154
d.Shutdown(ctx)
160155
}
161156
```
@@ -238,6 +233,11 @@ called at any point during `groupcache.Instance` operation as peers leave or joi
238233

239234
If `SetPeers()` is not called, then the `groupcache.Instance` will operate as a local only cache.
240235

236+
### groupcache.Daemon
237+
This is a convenience struct which encapsulates a `groupcache.Instance` to simplify starting and stopping an instance and
238+
the associated transport. Calling `groupcache.SpawnDaemon()` calls `SpawnTransport()` on the provided transport to
239+
listen for incoming requests.
240+
241241
### data.Group
242242
Holds the cache that makes up the "group" which can be shared with other instances of group cache. Each
243243
`groupcache.Instance` must create the same group using the same group name. Group names are how a "group" cache is
@@ -269,17 +269,8 @@ which peer in the cluster is our instance, `Instance.SetPeers()` will return an
269269
Contains data structures used by groupcache to serialize data between remote instances.
270270

271271
### cluster package
272-
Is a convenience package containing functions to easily spawn and shutdown groupcache instances (called daemons) or to
273-
create a local cluster of group cache instances using the default `transport.HttpTransport`.
274-
275-
**SpawnDaemon()** Spawns a single instance of groupcache using the config provided. The returned *Daemon has methods which
276-
make interacting with the groupcache instance simple.
277-
278-
```go
279-
// Starts an instance of groupcache with the provided transport
280-
d, _ := cluster.SpawnDaemon(ctx, "192.168.1.1:8080", groupcache.Options{})
281-
defer d.Shutdown(context.Background())
282-
```
272+
Is a convenience package containing functions to easily spawn and shutdown a cluster of groupcache instances
273+
(called daemons).
283274

284275
**Start()** and **StartWith()** starts a local cluster of groupcache daemons suitable for testing. Users who wish to
285276
test groupcache in their own project test suites can use these methods to start and stop clusters.

cluster/cluster.go

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,8 @@ limitations under the License.
1616

1717
/*
1818
Package cluster contains convince functions which make managing the creation of multiple groupcache instances
19-
simple.
20-
21-
# SpawnDaemon()
22-
23-
Spawns a single instance of groupcache using the config provided. The returned *Daemon has methods which
24-
make interacting with the groupcache instance simple.
25-
26-
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
27-
defer cancel()
28-
29-
// Starts an instance of groupcache with the provided transport
30-
d, err := cluster.SpawnDaemon(ctx, "192.168.1.1:8080", groupcache.Options{})
31-
if err != nil {
32-
log.Fatal("while starting server on 192.168.1.1:8080")
33-
}
34-
35-
d.Shutdown(context.Background())
36-
37-
# Start() and StartWith()
38-
39-
Starts a local cluster of groupcache daemons suitable for testing. Users who wish to test groupcache in their
40-
own project test suites can use these methods to start and stop clusters. See cluster_test.go for more examples.
19+
simple. To start a local cluster of groupcache daemons suitable for testing you can call cluster.Start()
20+
or cluster.StartWith(). See cluster_test.go for more examples.
4121
4222
err := cluster.Start(context.Background(), 2, groupcache.Options{})
4323
require.NoError(t, err)
@@ -59,7 +39,7 @@ import (
5939
"github.com/groupcache/groupcache-go/v3/transport/peer"
6040
)
6141

62-
var _daemons []*Daemon
42+
var _daemons []*groupcache.Daemon
6343
var _peers []peer.Info
6444

6545
// ListPeers returns a list of all peers in the cluster
@@ -68,12 +48,12 @@ func ListPeers() []peer.Info {
6848
}
6949

7050
// ListDaemons returns a list of all daemons in the cluster
71-
func ListDaemons() []*Daemon {
51+
func ListDaemons() []*groupcache.Daemon {
7252
return _daemons
7353
}
7454

7555
// DaemonAt returns a specific daemon
76-
func DaemonAt(idx int) *Daemon {
56+
func DaemonAt(idx int) *groupcache.Daemon {
7757
return _daemons[idx]
7858
}
7959

@@ -83,8 +63,12 @@ func PeerAt(idx int) peer.Info {
8363
}
8464

8565
// FindOwningDaemon finds the daemon which owns the key provided
86-
func FindOwningDaemon(key string) *Daemon {
87-
c, isRemote := _daemons[0].GroupCache.PickPeer(key)
66+
func FindOwningDaemon(key string) *groupcache.Daemon {
67+
if len(_daemons) == 0 {
68+
panic("'_daemon' is empty; start a cluster with Start() or StartWith()")
69+
}
70+
71+
c, isRemote := _daemons[0].GetInstance().PickPeer(key)
8872
if !isRemote {
8973
return _daemons[0]
9074
}
@@ -124,7 +108,7 @@ func StartWith(ctx context.Context, peers []peer.Info, opts groupcache.Options)
124108
}
125109

126110
for _, p := range peers {
127-
d, err := SpawnDaemon(ctx, p.Address, opts)
111+
d, err := groupcache.SpawnDaemon(ctx, p.Address, opts)
128112
if err != nil {
129113
return fmt.Errorf("StartWith: while starting daemon for '%s': %w", p.Address, err)
130114
}
@@ -158,7 +142,7 @@ func Restart(ctx context.Context) error {
158142
if err := _daemons[i].Start(ctx); err != nil {
159143
return err
160144
}
161-
_ = _daemons[i].GroupCache.SetPeers(ctx, _peers)
145+
_ = _daemons[i].GetInstance().SetPeers(ctx, _peers)
162146
}
163147
return nil
164148
}

cluster/daemon.go

Lines changed: 0 additions & 88 deletions
This file was deleted.

daemon.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
Copyright Derrick J Wippler
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package groupcache
18+
19+
import (
20+
"context"
21+
"log/slog"
22+
23+
"github.com/groupcache/groupcache-go/v3/data"
24+
"github.com/groupcache/groupcache-go/v3/transport"
25+
"github.com/groupcache/groupcache-go/v3/transport/peer"
26+
)
27+
28+
// Daemon is an instance of groupcache
29+
type Daemon struct {
30+
instance *Instance
31+
opts Options
32+
address string
33+
}
34+
35+
// SpawnDaemon starts a new instance of daemon with the config provided
36+
func SpawnDaemon(ctx context.Context, address string, opts Options) (*Daemon, error) {
37+
if opts.Logger == nil {
38+
opts.Logger = slog.Default()
39+
}
40+
41+
if opts.Transport == nil {
42+
opts.Transport = transport.NewHttpTransport(transport.HttpTransportOptions{})
43+
}
44+
45+
daemon := &Daemon{
46+
address: address,
47+
opts: opts,
48+
}
49+
50+
return daemon, daemon.Start(ctx)
51+
}
52+
53+
func (d *Daemon) Start(ctx context.Context) error {
54+
d.instance = New(d.opts)
55+
return d.opts.Transport.SpawnServer(ctx, d.address)
56+
}
57+
58+
// NewGroup is a convenience method which calls NewGroup on the instance associated with this daemon.
59+
func (d *Daemon) NewGroup(name string, cacheBytes int64, getter Getter) (data.Group, error) {
60+
return d.instance.NewGroup(name, cacheBytes, getter)
61+
}
62+
63+
// GetGroup is a convenience method which calls GetGroup on the instance associated with this daemon
64+
func (d *Daemon) GetGroup(name string) data.Group {
65+
return d.instance.GetGroup(name)
66+
}
67+
68+
// RemoveGroup is a convenience method which calls RemoveGroup on the instance associated with this daemon
69+
func (d *Daemon) RemoveGroup(name string) {
70+
d.instance.RemoveGroup(name)
71+
}
72+
73+
// GetInstance returns the current groupcache instance associated with this daemon
74+
func (d *Daemon) GetInstance() *Instance {
75+
return d.instance
76+
}
77+
78+
// SetPeers is a convenience method which calls SetPeers on the instance associated with this daemon. In
79+
// addition, it finds and marks this instance as self by asking the transport for it's listening address
80+
// before calling SetPeers() on the instance. If this is not desirable, call Daemon.GetInstance().SetPeers()
81+
// instead.
82+
func (d *Daemon) SetPeers(ctx context.Context, src []peer.Info) error {
83+
dest := make([]peer.Info, len(src))
84+
for idx := 0; idx < len(src); idx++ {
85+
dest[idx] = src[idx]
86+
if dest[idx].Address == d.ListenAddress() {
87+
dest[idx].IsSelf = true
88+
}
89+
}
90+
return d.instance.SetPeers(ctx, dest)
91+
}
92+
93+
// MustClient is a convenience method which creates a new client for this instance. This method will
94+
// panic if transport.NewClient() returns an error.
95+
func (d *Daemon) MustClient() peer.Client {
96+
c, err := d.opts.Transport.NewClient(context.Background(), peer.Info{Address: d.ListenAddress()})
97+
if err != nil {
98+
panic(err)
99+
}
100+
return c
101+
}
102+
103+
// ListenAddress returns the address this instance is listening on
104+
func (d *Daemon) ListenAddress() string {
105+
return d.opts.Transport.ListenAddress()
106+
}
107+
108+
// Shutdown attempts a clean shutdown of the daemon and all related resources.
109+
func (d *Daemon) Shutdown(ctx context.Context) error {
110+
return d.opts.Transport.ShutdownServer(ctx)
111+
}

data/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"time"
2222
)
2323

24+
// TODO: Attempt to move this into `transport` package and Sink back into `groupcache` package
25+
2426
type Group interface {
2527
Set(context.Context, string, []byte, time.Time, bool) error
2628
Get(context.Context, string, Sink) error

0 commit comments

Comments
 (0)