Skip to content

Log panics occurring in event handler goroutines #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions pool/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ func AddNode(ctx context.Context, name string, rdb *redis.Client, opts ...NodeOp

p.wg.Add(5)
pch := poolSink.Subscribe()
go p.handlePoolEvents(pch) // handleXXX handles streaming events
go p.handleNodeEvents(nch)
go p.manageWorkers(ctx) // manageXXX handles map updates
go p.manageShutdown(ctx)
go p.manageInactiveWorkers(ctx)
pulse.Go(ctx, func() { p.handlePoolEvents(pch) }) // handleXXX handles streaming events
pulse.Go(ctx, func() { p.handleNodeEvents(nch) })
pulse.Go(ctx, func() { p.manageWorkers(ctx) }) // manageXXX handles map updates
pulse.Go(ctx, func() { p.manageShutdown(ctx) })
pulse.Go(ctx, func() { p.manageInactiveWorkers(ctx) })
return p, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pool/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval t
if err := sched.stopJobs(ctx, plan); err != nil {
return fmt.Errorf("failed to stop jobs: %w", err)
}
go sched.scheduleJobs(ctx, ticker, producer)
go sched.handleStop(ctx)
pulse.Go(ctx, func() { sched.scheduleJobs(ctx, ticker, producer) })
pulse.Go(ctx, func() { sched.handleStop(ctx) })
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pool/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
}
t.initTimer()
t.wg.Add(1)
go t.handleEvents()
pulse.Go(ctx, func() { t.handleEvents() })
return t, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func newWorker(ctx context.Context, node *Node, h JobHandler) (*Worker, error) {
"worker_shutdown_ttl", w.workerShutdownTTL)

w.wg.Add(2)
go w.handleEvents(reader.Subscribe())
go w.keepAlive(ctx)
pulse.Go(ctx, func() { w.handleEvents(reader.Subscribe()) })
pulse.Go(ctx, func() { w.keepAlive(ctx) })

return w, nil
}
Expand Down
29 changes: 29 additions & 0 deletions pulse/goroutine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pulse

import (
"context"
"fmt"
"runtime/debug"

"goa.design/clue/log"
)

// Go runs the given function in a separate goroutine and recovers from any panic,
// logging the panic message and stack trace.
//
// Usage:
//
// Go(ctx, func() {
// // Your code here
// })
func Go(ctx context.Context, f func()) {
go func() {
defer func(ctx context.Context) {
if r := recover(); r != nil {
panicErr := fmt.Errorf("Panic recovered: %v\n%s", r, debug.Stack())
log.Error(ctx, panicErr)
}
}(ctx)
f()
}()
}
173 changes: 1 addition & 172 deletions rmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,177 +58,6 @@ const (
EventReset
)

var (
// luaAppend is the Lua script used to append an item to an array key and
// return its new value.
luaAppend = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])

-- If the value exists, append the new value, otherwise assign ARGV[2] directly
v = (v and v .. "," .. ARGV[2]) or ARGV[2]

-- Set the updated value in the hash and publish the change
redis.call("HSET", KEYS[1], ARGV[1], v)
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)

return v
`)

// luaAppendUnique is the Lua script used to append an item to a set and return
// the result.
luaAppendUnique = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
local newValues = {}
local changed = false

-- Split ARGV[2] into a table of new values
for value in string.gmatch(ARGV[2], "[^,]+") do
table.insert(newValues, value)
end

-- If the value exists, process it, else set it directly
if v then
local existingValues = {}
-- Split existing values into a table
for value in string.gmatch(v, "[^,]+") do
existingValues[value] = true
end

-- Append unique new values to v
for _, newValue in ipairs(newValues) do
if not existingValues[newValue] then
v = (v == "") and newValue or v .. "," .. newValue
changed = true
end
end
else
v = table.concat(newValues, ",")
changed = true
end

-- If changes were made, update the hash and publish the event
if changed then
redis.call("HSET", KEYS[1], ARGV[1], v)
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
end

return v
`)

// luaDelete is the Lua script used to delete a key and return its previous
// value.
luaDelete = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
redis.call("HDEL", KEYS[1], ARGV[1])
redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
return v
`)

// luaIncr is the Lua script used to increment a key and return the new value.
luaIncr = redis.NewScript(`
redis.call("HINCRBY", KEYS[1], ARGV[1], ARGV[2])
local v = redis.call("HGET", KEYS[1], ARGV[1])
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..v)
return v
`)

// luaRemove is the Lua script used to remove items from an array value and
// return the result along with a flag indicating if any value was removed.
luaRemove = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
local removed = false

if v then
-- Create a set of current values
local curr = {}
for s in string.gmatch(v, "[^,]+") do
curr[s] = true
end

-- Remove specified values
for s in string.gmatch(ARGV[2], "[^,]+") do
if curr[s] then
curr[s] = nil
removed = true
end
end

-- Collect the remaining values
local newValues = {}
for key, _ in pairs(curr) do
table.insert(newValues, key)
end

-- Update the hash or delete the key if empty
if #newValues == 0 then
redis.call("HDEL", KEYS[1], ARGV[1])
v = ""
else
v = table.concat(newValues, ",")
redis.call("HSET", KEYS[1], ARGV[1], v)
end

-- Publish the result
redis.call("PUBLISH", KEYS[2], ARGV[1] .. "=" .. v)
end

return {v, removed}
`)

// luaReset is the Lua script used to reset the map.
luaReset = redis.NewScript(`
redis.call("DEL", KEYS[1])
redis.call("PUBLISH", KEYS[2], "*=")
`)

// luaSet is the Lua script used to set a key and return its previous value. We
// use Lua scripts to publish notifications "at the same time" and preserve the
// order of operations (scripts are run atomically within Redis).
luaSet = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
redis.call("HSET", KEYS[1], ARGV[1], ARGV[2])
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[2])
return v
`)

// luaTestAndDel is the Lua script used to delete a key if it has a specific value.
luaTestAndDel = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
if v == ARGV[2] then
redis.call("HDEL", KEYS[1], ARGV[1])
redis.call("PUBLISH", KEYS[2], ARGV[1].."=")
end
return v
`)

// luaTestAndReset is the Lua script used to reset the map if all the given keys
// have the given values.
luaTestAndReset = redis.NewScript(`
local hash = KEYS[1]
local n = (#ARGV - 1) / 2

for i = 2, n + 1 do
if redis.call("HGET", hash, ARGV[i]) ~= ARGV[i + n] then
return 0
end
end

redis.call("DEL", hash)
redis.call("PUBLISH", KEYS[2], "*=")
return 1
`)

// luaTestAndSet is the Lua script used to set a key if it has a specific value.
luaTestAndSet = redis.NewScript(`
local v = redis.call("HGET", KEYS[1], ARGV[1])
if v == ARGV[2] then
redis.call("HSET", KEYS[1], ARGV[1], ARGV[3])
redis.call("PUBLISH", KEYS[2], ARGV[1].."="..ARGV[3])
end
return v
`)
)

// Join retrieves the content of the replicated map with the given name and
// subscribes to updates. The local content is eventually consistent across all
// nodes that join the replicated map with the same name.
Expand Down Expand Up @@ -274,7 +103,7 @@ func Join(ctx context.Context, name string, rdb *redis.Client, opts ...MapOption

// read updates
sm.wait.Add(1)
go sm.run()
pulse.Go(ctx, sm.run)

sm.logger.Info("joined")
return sm, nil
Expand Down
Loading