From eb07837b35051b23f18b086806b0d3472ea60cb8 Mon Sep 17 00:00:00 2001 From: tommyo Date: Tue, 28 May 2024 12:34:00 -0400 Subject: [PATCH] add nats --- README.md | 1 + natsstore/README.md | 38 ++++++ natsstore/example_test.go | 45 +++++++ natsstore/go.mod | 20 +++ natsstore/go.sum | 25 ++++ natsstore/natsstore.go | 195 +++++++++++++++++++++++++++ natsstore/natsstore_test.go | 253 ++++++++++++++++++++++++++++++++++++ 7 files changed, 577 insertions(+) create mode 100644 natsstore/README.md create mode 100644 natsstore/example_test.go create mode 100644 natsstore/go.mod create mode 100644 natsstore/go.sum create mode 100644 natsstore/natsstore.go create mode 100644 natsstore/natsstore_test.go diff --git a/README.md b/README.md index 520fe96..e7e78a1 100755 --- a/README.md +++ b/README.md @@ -165,6 +165,7 @@ The session stores currently included are shown in the table below. Please click | [mongodbstore](https://github.com/alexedwards/scs/tree/master/mongodbstore) | MongoDB based session store | | [mssqlstore](https://github.com/alexedwards/scs/tree/master/mssqlstore) | MSSQL based session store | | [mysqlstore](https://github.com/alexedwards/scs/tree/master/mysqlstore) | MySQL based session store | +| [natsstore](https://github.com/alexedwards/scs/tree/master/natsstore) | Nats Jetsream KV based session store | | [pgxstore](https://github.com/alexedwards/scs/tree/master/pgxstore) | PostgreSQL based session store (using the [pgx](https://github.com/jackc/pgx) driver) | | [postgresstore](https://github.com/alexedwards/scs/tree/master/postgresstore) | PostgreSQL based session store (using the [pq](https://github.com/lib/pq) driver) | | [redisstore](https://github.com/alexedwards/scs/tree/master/redisstore) | Redis based session store | diff --git a/natsstore/README.md b/natsstore/README.md new file mode 100644 index 0000000..a2d4719 --- /dev/null +++ b/natsstore/README.md @@ -0,0 +1,38 @@ +# boltstore + +A [NATS JetStream KVStore](https://docs.nats.io/nats-concepts/jetstream/key-value-store) based session store for [SCS](https://github.com/alexedwards/scs). + +## Setup + +You should follow the instructions to [open a NATS KV store](https://natsbyexample.com/examples/kv/intro/go), and pass the database to `natsstore.New()` to establish the session store. + +## Expiring contexts + +You should probably be using the `CtxStore` methods for best context control. If, however, you decide to use the `Store` methods, you **must** set a global context timeout value. + +```go +// set the global context timeout to 100ms +natsstore.New(db, WithTimeout(time.Millisecond * 100)) +``` + +## Expired Session Cleanup + +This package provides a background 'cleanup' goroutine to delete expired session data. This stops the database table from holding on to invalid sessions indefinitely and growing unnecessarily large. By default the cleanup runs every 1 minute. You can change this by using the `WithCleanupInterval` function to initialize your session store. For example: + +```go +// Run a cleanup every 5 minutes. +natsstore.New(db, WithCleanupInterval(5*time.Minute)) + +// Disable the cleanup goroutine by setting the cleanup interval to zero. +natsstore.New(db, WithCleanupInterval(0)) +``` + +### Terminating the Cleanup Goroutine + +It's rare that the cleanup goroutine needs to be terminated --- it is generally intended to be long-lived and run for the lifetime of your application. + +However, there may be occasions when your use of a session store instance is transient. A common example would be using it in a short-lived test function. In this scenario, the cleanup goroutine (which will run forever) will prevent the session store instance from being garbage collected even after the test function has finished. You can prevent this by either disabling the cleanup goroutine altogether (as described above) or by stopping it using the `StopCleanup()` method. + +## Notes + +Currently Nats doesn't allow per-key expiry. In order to support per-key expiry, we take a rather hacky approach to including the expiry in the stored data that is checked on retrieval. Per-key expiry is in the works for release 2.11. Once this is available in the go client we will simplify the code and release as a /v2. diff --git a/natsstore/example_test.go b/natsstore/example_test.go new file mode 100644 index 0000000..f724801 --- /dev/null +++ b/natsstore/example_test.go @@ -0,0 +1,45 @@ +package natsstore + +import ( + "context" + "fmt" + "time" + + "github.com/alexedwards/scs/v2" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +func ExampleRun() { + nc, _ := nats.Connect(natsURL) + defer nc.Drain() + + js, _ := jetstream.New(nc) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + db, _ := js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "scs_example", + Storage: jetstream.MemoryStorage, + }) + + store := New(db, WithTimeout(time.Second), WithCleanupInterval(30*time.Minute)) + + sessionManager := scs.New() + sessionManager.Store = store + + // see the store in action + + putCtx, _ := sessionManager.Load(context.Background(), "") + + sessionManager.Put(putCtx, "foo", "bar") + token, _, _ := sessionManager.Commit(putCtx) + + getCtx, _ := sessionManager.Load(context.Background(), token) + + foo := sessionManager.GetString(getCtx, "foo") + + fmt.Println(foo) + // Output: bar +} diff --git a/natsstore/go.mod b/natsstore/go.mod new file mode 100644 index 0000000..5b16877 --- /dev/null +++ b/natsstore/go.mod @@ -0,0 +1,20 @@ +module github.com/alexedwards/scs/natsstore + +go 1.22.2 + +require ( + github.com/nats-io/nats-server/v2 v2.10.14 + github.com/nats-io/nats.go v1.34.1 +) + +require ( + github.com/klauspost/compress v1.17.7 // indirect + github.com/minio/highwayhash v1.0.2 // indirect + github.com/nats-io/jwt/v2 v2.5.5 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/sys v0.19.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/time v0.5.0 // indirect +) diff --git a/natsstore/go.sum b/natsstore/go.sum new file mode 100644 index 0000000..6b972ac --- /dev/null +++ b/natsstore/go.sum @@ -0,0 +1,25 @@ +github.com/alexedwards/scs/v2 v2.8.0 h1:h31yUYoycPuL0zt14c0gd+oqxfRwIj6SOjHdKRZxhEw= +github.com/alexedwards/scs/v2 v2.8.0/go.mod h1:ToaROZxyKukJKT/xLcVQAChi5k6+Pn1Gvmdl7h3RRj8= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4= +github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/nats-server/v2 v2.10.14 h1:98gPJFOAO2vLdM0gogh8GAiHghwErrSLhugIqzRC+tk= +github.com/nats-io/nats-server/v2 v2.10.14/go.mod h1:a0TwOVBJZz6Hwv7JH2E4ONdpyFk9do0C18TEwxnHdRk= +github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4= +github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= diff --git a/natsstore/natsstore.go b/natsstore/natsstore.go new file mode 100644 index 0000000..271d8bc --- /dev/null +++ b/natsstore/natsstore.go @@ -0,0 +1,195 @@ +package natsstore + +import ( + "context" + "log" + "time" + + "github.com/nats-io/nats.go/encoders/builtin" + "github.com/nats-io/nats.go/jetstream" +) + +var encoder = &builtin.GobEncoder{} + +type expirableValue struct { + Value []byte + Expires time.Time +} + +type NatsStore struct { + db jetstream.KeyValue + + timeout time.Duration + cleanup time.Duration + stopCleanup chan bool +} + +func (ns *NatsStore) get(ctx context.Context, key string, now time.Time) ([]byte, bool, error) { + val, err := ns.db.Get(ctx, key) + if err == jetstream.ErrKeyNotFound { + return nil, false, nil + } + if err != nil { + return nil, false, err + } + + var decoded expirableValue + err = encoder.Decode(key, val.Value(), &decoded) + if err != nil { + return nil, false, err + } + + if decoded.Expires.Before(now) { + err = ns.delete(ctx, key) + return nil, false, err + } + + return decoded.Value, true, nil +} + +func (ns *NatsStore) put(ctx context.Context, key string, val []byte, expiry time.Time) error { + toEncode := expirableValue{ + Value: val, + Expires: expiry, + } + encoded, err := encoder.Encode(key, toEncode) + if err != nil { + return err + } + _, err = ns.db.Put(ctx, key, encoded) + return err +} + +func (ns *NatsStore) delete(ctx context.Context, key string) error { + return ns.db.Purge(ctx, key) +} + +// AllCtx implements scs.IterableCtxStore. +func (ns *NatsStore) AllCtx(ctx context.Context) (map[string][]byte, error) { + // cleanup := false + now := time.Now() + + keys, err := ns.db.ListKeys(ctx, jetstream.IgnoreDeletes()) + defer keys.Stop() + if err != nil { + return nil, err + } + + out := make(map[string][]byte) + for key := range keys.Keys() { + val, available, err := ns.get(ctx, key, now) + if !available || err != nil { + // cleanup = true + continue + } + out[key] = val + } + // ns.db.PurgeDeletes(ctx) + return out, nil +} + +func (ns *NatsStore) StartCleanup() { + ns.stopCleanup = make(chan bool) + ticker := time.NewTicker(ns.cleanup) + for { + select { + case <-ticker.C: + ctx, cancel := context.WithTimeout(context.Background(), ns.cleanup.Truncate(time.Second)) + + err := ns.db.PurgeDeletes(ctx) + if err != nil { + log.Println(err) + } + cancel() + case <-ns.stopCleanup: + ns.stopCleanup = nil + ticker.Stop() + return + } + } +} + +// StopCleanup terminates the background cleanup goroutine for the NetsKVStore +// instance. It's rare to terminate this; generally NetsKVStore instances and +// their cleanup goroutines are intended to be long-lived and run for the lifetime +// of your application. +// +// There may be occasions though when your use of the NetsKVStore is transient. +// An example is creating a new NetsKVStore instance in a test function. In this +// scenario, the cleanup goroutine (which will run forever) will prevent the +// NetsKVStore object from being garbage collected even after the test function +// has finished. You can prevent this by manually calling StopCleanup. +func (bs *NatsStore) StopCleanup() { + if bs.stopCleanup != nil { + bs.stopCleanup <- true + } +} + +// All implements scs.IterableStore. +func (ns *NatsStore) All() (map[string][]byte, error) { + ctx, cancel := context.WithTimeout(context.Background(), ns.timeout) + defer cancel() + return ns.AllCtx(ctx) +} + +// FindCtx implements scs.CtxStore. +func (ns *NatsStore) FindCtx(ctx context.Context, token string) ([]byte, bool, error) { + return ns.get(ctx, token, time.Now()) +} + +// Find implements scs.Store. +func (ns *NatsStore) Find(token string) ([]byte, bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), ns.timeout) + defer cancel() + return ns.FindCtx(ctx, token) +} + +// CommitCtx implements scs.CtxStore. +func (ns *NatsStore) CommitCtx(ctx context.Context, token string, b []byte, expiry time.Time) error { + return ns.put(ctx, token, b, expiry) +} + +// Commit implements scs.Store. +func (ns *NatsStore) Commit(token string, b []byte, expiry time.Time) error { + ctx, cancel := context.WithTimeout(context.Background(), ns.timeout) + defer cancel() + return ns.CommitCtx(ctx, token, b, expiry) +} + +// DeleteCtx implements scs.CtxStore. +func (ns *NatsStore) DeleteCtx(ctx context.Context, token string) error { + return ns.delete(ctx, token) +} + +func (ns *NatsStore) Delete(token string) error { + ctx, cancel := context.WithTimeout(context.Background(), ns.timeout) + defer cancel() + return ns.DeleteCtx(ctx, token) +} + +// New creates a NatsStore instance. db should be a pointer to a jetstreamKeyValue store. +func New(db jetstream.KeyValue, opts ...Opt) *NatsStore { + ns := &NatsStore{db: db, cleanup: time.Minute} + for _, opt := range opts { + opt(ns) + } + if ns.cleanup > 0 { + go ns.StartCleanup() + } + return ns +} + +type Opt func(ns *NatsStore) + +func WithTimeout(t time.Duration) Opt { + return func(ns *NatsStore) { + ns.timeout = t + } +} + +// CleanupFrequency sets how frequently stale session data gets cleaned up. It's 1 min by default +func WithCleanupInterval(t time.Duration) Opt { + return func(ns *NatsStore) { + ns.cleanup = t + } +} diff --git a/natsstore/natsstore_test.go b/natsstore/natsstore_test.go new file mode 100644 index 0000000..3d52b4a --- /dev/null +++ b/natsstore/natsstore_test.go @@ -0,0 +1,253 @@ +package natsstore + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/base64" + "os" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var db jetstream.KeyValue + +var natsURL string + +func localNats() *server.Server { + ns, err := server.NewServer(&server.Options{JetStream: true}) + if err != nil { + panic(err) + } + + go ns.Start() + time.Sleep(time.Second) + return ns +} + +func TestMain(m *testing.M) { + ns := localNats() + defer ns.Shutdown() + + natsURL = ns.ClientURL() + + nc, err := nats.Connect(natsURL) + if err != nil { + panic(err) + } + defer nc.Drain() + + js, err := jetstream.New(nc) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + db, err = js.CreateKeyValue(ctx, jetstream.KeyValueConfig{ + Bucket: "scs_tests", + Storage: jetstream.MemoryStorage, + }) + if err != nil { + panic(err) + } + + results := m.Run() + + os.Exit(results) +} + +type testsData struct { + key string + value []byte + expiry time.Time +} + +func generateData(count int, expiry time.Duration) []testsData { + val := func() []byte { + out := make([]byte, 8) + rand.Read(out) + return out + } + // mimics what scs uses to generate tokens + key := func() string { + return base64.RawURLEncoding.EncodeToString(val()) + } + out := make([]testsData, count) + for i := 0; i < count; i++ { + out[i] = testsData{key(), val(), time.Now().Add(expiry)} + } + return out +} + +func TestCrud(t *testing.T) { + h := New(db, WithTimeout(time.Minute)) + + src := generateData(1, time.Hour)[0] + + t.Run("Commit", func(t *testing.T) { + err := h.Commit(src.key, src.value, src.expiry) + if err != nil { + t.Error(err) + } + }) + + t.Run("Find", func(t *testing.T) { + val, found, err := h.Find(src.key) + if err != nil { + t.Error(err) + } + + if found != true { + t.Error("record not found") + } + + if !bytes.Equal(val, src.value) { + t.Error("values don't match") + } + }) + + t.Run("Delete", func(t *testing.T) { + err := h.Delete(src.key) + if err != nil { + t.Error(err) + } + + _, found, err := h.Find(src.key) + if err != nil { + t.Error(err) + } + + if found != false { + t.Error("record not deleted") + } + }) +} + +func TestCrudCtx(t *testing.T) { + h := New(db) + + src := generateData(1, time.Hour)[0] + + t.Run("Commit", func(t *testing.T) { + err := h.CommitCtx(context.TODO(), src.key, src.value, src.expiry) + if err != nil { + t.Error(err) + } + }) + + t.Run("Find", func(t *testing.T) { + val, found, err := h.FindCtx(context.TODO(), src.key) + if err != nil { + t.Error(err) + } + + if found != true { + t.Error("record not found") + } + + if !bytes.Equal(val, src.value) { + t.Error("values don't match") + } + }) + + t.Run("Delete", func(t *testing.T) { + ctx := context.TODO() + err := h.DeleteCtx(ctx, src.key) + if err != nil { + t.Error(err) + } + + _, found, err := h.FindCtx(ctx, src.key) + if err != nil { + t.Error(err) + } + + if found != false { + t.Error("record not deleted") + } + }) +} + +func TestAll(t *testing.T) { + h := New(db, WithTimeout(time.Second)) + src := generateData(2, time.Hour) + + for _, row := range src { + h.Commit(row.key, row.value, row.expiry) + } + + t.Run("basic", func(t *testing.T) { + all, err := h.All() + if err != nil { + t.Error(err) + } + + if len(all) != len(src) { + t.Error("count of All is incorrect") + } + }) + t.Run("with deletes", func(t *testing.T) { + h.Delete(src[0].key) + + all, err := h.All() + if err != nil { + t.Error(err) + } + + if len(all) != len(src)-1 { + t.Error("count of All is incorrect") + } + }) + t.Run("with expiry", func(t *testing.T) { + before, _ := h.All() + + shorties := generateData(2, time.Second*2) + for _, row := range shorties { + h.Commit(row.key, row.value, row.expiry) + } + + after, _ := h.All() + if len(after) != len(before)+len(shorties) { + t.Error("unexpected lengths from All") + } + + time.Sleep(time.Second * 3) + + after, _ = h.All() + if len(after) != len(before) { + t.Error("unexpected lengths from All after expiry") + } + }) + + // cleanup + all, _ := h.All() + for key := range all { + h.Delete(key) + } +} + +func TestCleanup(t *testing.T) { + h := New(db, WithTimeout(time.Second), WithCleanupInterval(time.Second)) + src := generateData(2, time.Hour) + + // uneccessary + defer h.StopCleanup() + + for _, row := range src { + h.Commit(row.key, row.value, row.expiry) + } + + time.Sleep(2 * time.Second) + + for _, row := range src { + h.Delete(row.key) + } + + time.Sleep(2 * time.Second) +}