Skip to content

Commit dbf81d0

Browse files
lxfontesjrallison
authored andcommitted
Make sure 'Fetcher' doesn't have a lingering job upon exit (#79)
1 parent d60d79d commit dbf81d0

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

fetcher.go

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,40 +54,32 @@ func (f *fetch) processOldMessages() {
5454
}
5555

5656
func (f *fetch) Fetch() {
57-
messages := make(chan string)
58-
5957
f.processOldMessages()
6058

61-
go func(c chan string) {
59+
go func() {
6260
for {
6361
// f.Close() has been called
6462
if f.Closed() {
6563
break
6664
}
6765
<-f.Ready()
68-
f.tryFetchMessage(c)
66+
f.tryFetchMessage()
6967
}
70-
}(messages)
71-
72-
f.handleMessages(messages)
73-
}
68+
}()
7469

75-
func (f *fetch) handleMessages(messages chan string) {
7670
for {
7771
select {
78-
case message := <-messages:
79-
f.sendMessage(message)
8072
case <-f.stop:
8173
// Stop the redis-polling goroutine
8274
close(f.closed)
8375
// Signal to Close() that the fetcher has stopped
8476
close(f.exit)
85-
return
77+
break
8678
}
8779
}
8880
}
8981

90-
func (f *fetch) tryFetchMessage(messages chan string) {
82+
func (f *fetch) tryFetchMessage() {
9183
conn := Config.Pool.Get()
9284
defer conn.Close()
9385

@@ -100,7 +92,7 @@ func (f *fetch) tryFetchMessage(messages chan string) {
10092
time.Sleep(1 * time.Second)
10193
}
10294
} else {
103-
messages <- message
95+
f.sendMessage(message)
10496
}
10597
}
10698

manager_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package workers
33
import (
44
"fmt"
55
"sync"
6+
"time"
67

78
"github.com/customerio/gospec"
89
. "github.com/customerio/gospec"
@@ -99,6 +100,38 @@ func ManagerSpec(c gospec.Context) {
99100
c.Expect(len, Equals, 0)
100101
})
101102

103+
c.Specify("drain queue completely on exit", func() {
104+
sentinel, _ := NewMsg("{\"foo\":\"bar2\",\"args\":\"sentinel\"}")
105+
106+
drained := false
107+
108+
slowJob := (func(message *Msg) {
109+
if message.ToJson() == sentinel.ToJson() {
110+
drained = true
111+
} else {
112+
processed <- message.Args()
113+
}
114+
115+
time.Sleep(1 * time.Second)
116+
})
117+
manager := newManager("manager1", slowJob, 10)
118+
119+
for i := 0; i < 9; i++ {
120+
conn.Do("lpush", "prod:queue:manager1", message.ToJson())
121+
}
122+
conn.Do("lpush", "prod:queue:manager1", sentinel.ToJson())
123+
124+
manager.start()
125+
for i := 0; i < 9; i++ {
126+
<-processed
127+
}
128+
manager.quit()
129+
130+
len, _ := redis.Int(conn.Do("llen", "prod:queue:manager1"))
131+
c.Expect(len, Equals, 0)
132+
c.Expect(drained, Equals, true)
133+
})
134+
102135
c.Specify("per-manager middlwares are called separately, global middleware is called in each manager", func() {
103136
mid1 := customMid{Base: "1"}
104137
mid2 := customMid{Base: "2"}

0 commit comments

Comments
 (0)