Skip to content

Commit fb0e73f

Browse files
authored
Log panics occuring in event handler goroutines (#41)
1 parent a7b5785 commit fb0e73f

File tree

10 files changed

+220
-188
lines changed

10 files changed

+220
-188
lines changed

pool/node.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,11 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp
208208

209209
p.wg.Add(5)
210210
pch := poolSink.Subscribe()
211-
go p.handlePoolEvents(pch) // handleXXX handles streaming events
212-
go p.handleNodeEvents(nch)
213-
go p.manageWorkers(ctx) // manageXXX handles map updates
214-
go p.manageShutdown(ctx)
215-
go p.manageInactiveWorkers(ctx)
211+
pulse.Go(ctx, func() { p.handlePoolEvents(pch) }) // handleXXX handles streaming events
212+
pulse.Go(ctx, func() { p.handleNodeEvents(nch) })
213+
pulse.Go(ctx, func() { p.manageWorkers(ctx) }) // manageXXX handles map updates
214+
pulse.Go(ctx, func() { p.manageShutdown(ctx) })
215+
pulse.Go(ctx, func() { p.manageInactiveWorkers(ctx) })
216216
return p, nil
217217
}
218218

pool/scheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval t
9999
if err := sched.stopJobs(ctx, plan); err != nil {
100100
return fmt.Errorf("failed to stop jobs: %w", err)
101101
}
102-
go sched.scheduleJobs(ctx, ticker, producer)
103-
go sched.handleStop(ctx)
102+
pulse.Go(ctx, func() { sched.scheduleJobs(ctx, ticker, producer) })
103+
pulse.Go(ctx, func() { sched.handleStop(ctx) })
104104
return nil
105105
}
106106

pool/ticker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
6767
}
6868
t.initTimer()
6969
t.wg.Add(1)
70-
go t.handleEvents()
70+
pulse.Go(ctx, func() { t.handleEvents() })
7171
return t, nil
7272
}
7373

pool/worker.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
127127
"worker_shutdown_ttl", w.workerShutdownTTL)
128128

129129
w.wg.Add(2)
130-
go w.handleEvents(reader.Subscribe())
131-
go w.keepAlive(ctx)
130+
pulse.Go(ctx, func() { w.handleEvents(reader.Subscribe()) })
131+
pulse.Go(ctx, func() { w.keepAlive(ctx) })
132132

133133
return w, nil
134134
}

pulse/goroutine.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package pulse
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"runtime/debug"
7+
8+
"goa.design/clue/log"
9+
)
10+
11+
// Go runs the given function in a separate goroutine and recovers from any panic,
12+
// logging the panic message and stack trace.
13+
//
14+
// Usage:
15+
//
16+
// Go(ctx, func() {
17+
// // Your code here
18+
// })
19+
func Go(ctx context.Context, f func()) {
20+
go func() {
21+
defer func(ctx context.Context) {
22+
if r := recover(); r != nil {
23+
panicErr := fmt.Errorf("Panic recovered: %v\n%s", r, debug.Stack())
24+
log.Error(ctx, panicErr)
25+
}
26+
}(ctx)
27+
f()
28+
}()
29+
}

rmap/map.go

+1-172
Original file line numberDiff line numberDiff line change
@@ -58,177 +58,6 @@ const (
5858
EventReset
5959
)
6060

61-
var (
62-
// luaAppend is the Lua script used to append an item to an array key and
63-
// return its new value.
64-
luaAppend = redis.NewScript(`
65-
local v = redis.call("HGET", KEYS[1], ARGV[1])
66-
67-
-- If the value exists, append the new value, otherwise assign ARGV[2] directly
68-
v = (v and v .. "," .. ARGV[2]) or ARGV[2]
69-
70-
-- Set the updated value in the hash and publish the change
71-
redis.call("HSET", KEYS[1], ARGV[1], v)
72-
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
73-
74-
return v
75-
`)
76-
77-
// luaAppendUnique is the Lua script used to append an item to a set and return
78-
// the result.
79-
luaAppendUnique = redis.NewScript(`
80-
local v = redis.call("HGET", KEYS[1], ARGV[1])
81-
local newValues = {}
82-
local changed = false
83-
84-
-- Split ARGV[2] into a table of new values
85-
for value in string.gmatch(ARGV[2], "[^,]+") do
86-
table.insert(newValues, value)
87-
end
88-
89-
-- If the value exists, process it, else set it directly
90-
if v then
91-
local existingValues = {}
92-
-- Split existing values into a table
93-
for value in string.gmatch(v, "[^,]+") do
94-
existingValues[value] = true
95-
end
96-
97-
-- Append unique new values to v
98-
for _, newValue in ipairs(newValues) do
99-
if not existingValues[newValue] then
100-
v = (v == "") and newValue or v .. "," .. newValue
101-
changed = true
102-
end
103-
end
104-
else
105-
v = table.concat(newValues, ",")
106-
changed = true
107-
end
108-
109-
-- If changes were made, update the hash and publish the event
110-
if changed then
111-
redis.call("HSET", KEYS[1], ARGV[1], v)
112-
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
113-
end
114-
115-
return v
116-
`)
117-
118-
// luaDelete is the Lua script used to delete a key and return its previous
119-
// value.
120-
luaDelete = redis.NewScript(`
121-
local v = redis.call("HGET", KEYS[1], ARGV[1])
122-
redis.call("HDEL", KEYS[1], ARGV[1])
123-
redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
124-
return v
125-
`)
126-
127-
// luaIncr is the Lua script used to increment a key and return the new value.
128-
luaIncr = redis.NewScript(`
129-
redis.call("HINCRBY", KEYS[1], ARGV[1], ARGV[2])
130-
local v = redis.call("HGET", KEYS[1], ARGV[1])
131-
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..v)
132-
return v
133-
`)
134-
135-
// luaRemove is the Lua script used to remove items from an array value and
136-
// return the result along with a flag indicating if any value was removed.
137-
luaRemove = redis.NewScript(`
138-
local v = redis.call("HGET", KEYS[1], ARGV[1])
139-
local removed = false
140-
141-
if v then
142-
-- Create a set of current values
143-
local curr = {}
144-
for s in string.gmatch(v, "[^,]+") do
145-
curr[s] = true
146-
end
147-
148-
-- Remove specified values
149-
for s in string.gmatch(ARGV[2], "[^,]+") do
150-
if curr[s] then
151-
curr[s] = nil
152-
removed = true
153-
end
154-
end
155-
156-
-- Collect the remaining values
157-
local newValues = {}
158-
for key, _ in pairs(curr) do
159-
table.insert(newValues, key)
160-
end
161-
162-
-- Update the hash or delete the key if empty
163-
if #newValues == 0 then
164-
redis.call("HDEL", KEYS[1], ARGV[1])
165-
v = ""
166-
else
167-
v = table.concat(newValues, ",")
168-
redis.call("HSET", KEYS[1], ARGV[1], v)
169-
end
170-
171-
-- Publish the result
172-
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
173-
end
174-
175-
return {v, removed}
176-
`)
177-
178-
// luaReset is the Lua script used to reset the map.
179-
luaReset = redis.NewScript(`
180-
redis.call("DEL", KEYS[1])
181-
redis.call("PUBLISH", KEYS[2], "*=")
182-
`)
183-
184-
// luaSet is the Lua script used to set a key and return its previous value. We
185-
// use Lua scripts to publish notifications "at the same time" and preserve the
186-
// order of operations (scripts are run atomically within Redis).
187-
luaSet = redis.NewScript(`
188-
local v = redis.call("HGET", KEYS[1], ARGV[1])
189-
redis.call("HSET", KEYS[1], ARGV[1], ARGV[2])
190-
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[2])
191-
return v
192-
`)
193-
194-
// luaTestAndDel is the Lua script used to delete a key if it has a specific value.
195-
luaTestAndDel = redis.NewScript(`
196-
local v = redis.call("HGET", KEYS[1], ARGV[1])
197-
if v == ARGV[2] then
198-
redis.call("HDEL", KEYS[1], ARGV[1])
199-
redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
200-
end
201-
return v
202-
`)
203-
204-
// luaTestAndReset is the Lua script used to reset the map if all the given keys
205-
// have the given values.
206-
luaTestAndReset = redis.NewScript(`
207-
local hash = KEYS[1]
208-
local n = (#ARGV - 1) / 2
209-
210-
for i = 2, n + 1 do
211-
if redis.call("HGET", hash, ARGV[i]) ~= ARGV[i + n] then
212-
return 0
213-
end
214-
end
215-
216-
redis.call("DEL", hash)
217-
redis.call("PUBLISH", KEYS[2], "*=")
218-
return 1
219-
`)
220-
221-
// luaTestAndSet is the Lua script used to set a key if it has a specific value.
222-
luaTestAndSet = redis.NewScript(`
223-
local v = redis.call("HGET", KEYS[1], ARGV[1])
224-
if v == ARGV[2] then
225-
redis.call("HSET", KEYS[1], ARGV[1], ARGV[3])
226-
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[3])
227-
end
228-
return v
229-
`)
230-
)
231-
23261
// Join retrieves the content of the replicated map with the given name and
23362
// subscribes to updates. The local content is eventually consistent across all
23463
// nodes that join the replicated map with the same name.
@@ -274,7 +103,7 @@ func Join(ctx context.Context, name string, rdb *redis.Client, opts ...MapOption
274103

275104
// read updates
276105
sm.wait.Add(1)
277-
go sm.run()
106+
pulse.Go(ctx, sm.run)
278107

279108
sm.logger.Info("joined")
280109
return sm, nil

0 commit comments

Comments
 (0)