Skip to content

Scope producer and ticker names with node name #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pool/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"os"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -119,9 +120,9 @@ func testContext(t *testing.T) context.Context {
return log.Context(context.Background(), log.WithDebug())
}

func testLogContext(t *testing.T) (context.Context, *bytes.Buffer) {
func testLogContext(t *testing.T) (context.Context, *buffer) {
t.Helper()
var buf bytes.Buffer
var buf buffer
return log.Context(context.Background(), log.WithOutput(&buf), log.WithFormat(log.FormatText), log.WithDebug()), &buf
}

Expand Down Expand Up @@ -159,3 +160,21 @@ type workerMock struct {
func (w *workerMock) Start(job *Job) error { return w.startFunc(job) }
func (w *workerMock) Stop(key string) error { return w.stopFunc(key) }
func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) }

// buffer is a goroutine safe bytes.Buffer
type buffer struct {
buffer bytes.Buffer
mutex sync.Mutex
}

func (s *buffer) Write(p []byte) (n int, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.buffer.Write(p)
}

func (s *buffer) String() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.buffer.String()
}
9 changes: 5 additions & 4 deletions pool/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,17 @@ var ErrScheduleStop = fmt.Errorf("stop")
// returns ErrScheduleStop. Plan is called on only one of the nodes that
// scheduled the same producer.
func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error {
jobMap, err := rmap.Join(ctx, producer.Name(), node.rdb, rmap.WithLogger(node.logger))
name := node.Name + ":" + producer.Name()
jobMap, err := rmap.Join(ctx, name, node.rdb, rmap.WithLogger(node.logger))
if err != nil {
return fmt.Errorf("failed to join job map: %w", err)
return fmt.Errorf("failed to join job map %s: %w", name, err)
}
ticker, err := node.NewTicker(ctx, producer.Name(), interval)
if err != nil {
return fmt.Errorf("failed to create ticker: %w", err)
return fmt.Errorf("failed to create ticker %s: %w", name, err)
}
sched := &scheduler{
name: producer.Name(),
name: name,
interval: interval,
producer: producer,
node: node,
Expand Down
26 changes: 19 additions & 7 deletions pool/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pool

import (
"sync"
"testing"
"time"

Expand All @@ -20,12 +21,16 @@ func TestSchedule(t *testing.T) {
worker = newTestWorker(t, ctx, node)
d = 10 * time.Millisecond
iter = 0
lock sync.Mutex
)
defer cleanup(t, rdb, false, testName)

inc := func() { lock.Lock(); iter++; lock.Unlock() }
it := func() int { lock.Lock(); defer lock.Unlock(); return iter }

producer := newTestProducer(testName, func() (*JobPlan, error) {
iter++
switch iter {
inc()
switch it() {
case 1:
assert.Equal(t, 0, numJobs(t, worker), "unexpected number of jobs")
// First iteration: start a job
Expand Down Expand Up @@ -60,17 +65,19 @@ func TestSchedule(t *testing.T) {
// Seventh iteration: stop schedule
return nil, ErrScheduleStop
}
t.Errorf("unexpected iteration %d", iter)
t.Errorf("unexpected iteration %d", it())
return nil, nil
})

// Observe call to reset
jobMap, err := rmap.Join(ctx, testName, rdb)
jobMap, err := rmap.Join(ctx, testName+":"+testName, rdb)
require.NoError(t, err)
var reset bool
c := jobMap.Subscribe()
defer jobMap.Unsubscribe(c)
done := make(chan struct{})
go func() {
defer close(done)
for ev := range c {
if ev == rmap.EventReset {
reset = true
Expand All @@ -82,9 +89,14 @@ func TestSchedule(t *testing.T) {
err = node.Schedule(ctx, producer, d)
require.NoError(t, err)

jobMap.Subscribe()
assert.Eventually(t, func() bool { return iter == 7 }, max, delay, "schedule should have stopped")
assert.Eventually(t, func() bool { return reset }, max, delay, "job map should have been reset")
assert.Eventually(t, func() bool { return it() == 7 }, max, delay, "schedule should have stopped")
select {
case <-done:
reset = true
case <-time.After(time.Second):
break
}
assert.True(t, reset, "job map should have been reset")
assert.NotContains(t, buf.String(), "level=error", "unexpected logged error")
}

Expand Down
1 change: 1 addition & 0 deletions pool/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
if node.clientOnly {
return nil, fmt.Errorf("cannot create ticker on client-only node")
}
name = node.Name + ":" + name
o := parseTickerOptions(opts...)
logger := o.logger
if logger == nil {
Expand Down