Skip to content

Commit 41c329f

Browse files
Better work pool pattern
1 parent 0078518 commit 41c329f

File tree

2 files changed

+168
-97
lines changed

2 files changed

+168
-97
lines changed

07-concurrency_patterns/work/main/main.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
// All material is licensed under the GNU Free Documentation License
22
// https://github.com/ArdanStudios/gotraining/blob/master/LICENSE
33

4-
// This example is provided with help by Jason Waldrip.
5-
64
// This sample program demostrates how to use the work package
75
// to use a pool of goroutines to get work done.
86
package main
97

108
import (
119
"fmt"
10+
"log"
1211
"sync"
1312
"time"
1413

@@ -30,21 +29,26 @@ type namePrinter struct {
3029
}
3130

3231
// Work implements the Worker interface.
33-
func (m *namePrinter) Work() {
32+
func (m *namePrinter) Work(id int) {
3433
fmt.Println(m.name)
3534
time.Sleep(time.Second)
3635
}
3736

37+
// logger is called by the work pool when
38+
// it has things to log.
39+
func logger(message string) {
40+
log.Println(message)
41+
}
42+
3843
// main is the entry point for all Go programs.
3944
func main() {
4045
// Create a work value with 2 goroutines.
41-
w := work.New(2)
42-
w.LogStats(100 * time.Millisecond)
46+
w, _ := work.New(2, time.Second, logger)
4347

4448
var wg sync.WaitGroup
45-
wg.Add(10 * len(names))
49+
wg.Add(100 * len(names))
4650

47-
for i := 0; i < 10; i++ {
51+
for i := 0; i < 100; i++ {
4852
// Iterate over the slice of names.
4953
for _, name := range names {
5054
// Create a namePrinter and provide the
@@ -63,6 +67,8 @@ func main() {
6367
}
6468

6569
for {
70+
// Enter a number and hit enter to change the size
71+
// of the work pool.
6672
var c int
6773
fmt.Scanf("%d", &c)
6874
if c == 0 {

07-concurrency_patterns/work/work.go

Lines changed: 155 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,151 +1,216 @@
11
// All material is licensed under the GNU Free Documentation License
22
// https://github.com/ArdanStudios/gotraining/blob/master/LICENSE
33

4-
// Package work manages a pool of goroutines to perform work.
4+
// Package work manages a pool of routines to perform work.
55
package work
66

77
import (
8+
"errors"
89
"fmt"
910
"sync"
1011
"sync/atomic"
1112
"time"
1213
)
1314

15+
const (
16+
addRoutine = 1
17+
rmvRoutine = 2
18+
)
19+
20+
// ErrorInvalidMinRoutines is the error for the invalid minRoutine parameter.
21+
var ErrorInvalidMinRoutines = errors.New("Invalid minimum number of routines")
22+
23+
// ErrorInvalidStatTime is the error for the invalid stat time parameter.
24+
var ErrorInvalidStatTime = errors.New("Invalid duration for stat time")
25+
1426
// Worker must be implemented by types that want to use
1527
// this worker processes.
1628
type Worker interface {
17-
Work()
29+
Work(id int)
1830
}
1931

20-
// Stats contains information about the work pool.
21-
type Stats struct {
22-
Goroutines int64
23-
Pending int64
24-
Active int64
25-
}
26-
27-
// Work provides a pool of goroutines that can execute any Worker
32+
// Work provides a pool of routines that can execute any Worker
2833
// tasks that are submitted.
2934
type Work struct {
30-
tasks chan Worker // Unbuffered channel that work is sent into.
31-
kill chan struct{} // Unbuffered channel to signal for a goroutine to die.
32-
shutdown chan struct{} // Closed when the Work pool is being shutdown.
33-
wg sync.WaitGroup // Manages the number of goroutines for shutdown.
34-
mutex sync.Mutex // Provides synchronization for managing the pool.
35-
stats Stats // Stats about the health of the pool.
35+
minRoutines int // Minumum number of routines always in the pool.
36+
statTime time.Duration // Time to display stats.
37+
counter int // Maintains a running total number of routines ever created.
38+
tasks chan Worker // Unbuffered channel that work is sent into.
39+
control chan int // Unbuffered channel that work for the manager is send into.
40+
kill chan struct{} // Unbuffered channel to signal for a goroutine to die.
41+
shutdown chan struct{} // Closed when the Work pool is being shutdown.
42+
wg sync.WaitGroup // Manages the number of routines for shutdown.
43+
routines int64 // Number of routines
44+
active int64 // Active number of routines in the work pool.
45+
pending int64 // Pending number of routines waiting to submit work.
46+
47+
logFunc func(message string) // Function called to providing logging support
3648
}
3749

3850
// New creates a new Worker.
39-
func New(goroutines int) *Work {
40-
w := Work{
41-
tasks: make(chan Worker),
42-
kill: make(chan struct{}),
43-
shutdown: make(chan struct{}),
51+
func New(minRoutines int, statTime time.Duration, logFunc func(message string)) (*Work, error) {
52+
if minRoutines <= 0 {
53+
return nil, ErrorInvalidMinRoutines
4454
}
4555

46-
w.Add(goroutines)
47-
48-
return &w
49-
}
50-
51-
// LogStats display work pool stats on the specified duration.
52-
func (w *Work) LogStats(d time.Duration) {
53-
w.wg.Add(1)
56+
if statTime < time.Millisecond {
57+
return nil, ErrorInvalidStatTime
58+
}
5459

55-
go func() {
56-
for {
57-
select {
58-
case <-w.shutdown:
59-
w.wg.Done()
60-
return
60+
w := Work{
61+
minRoutines: minRoutines,
62+
statTime: statTime,
63+
tasks: make(chan Worker),
64+
control: make(chan int),
65+
kill: make(chan struct{}),
66+
shutdown: make(chan struct{}),
67+
logFunc: logFunc,
68+
}
6169

62-
case <-time.After(d):
63-
s := w.Stats()
64-
fmt.Printf("G[%d] P[%d] A[%d]\n", s.Goroutines, s.Pending, s.Active)
65-
}
66-
}
67-
}()
68-
}
70+
// Start the manager.
71+
w.manager()
6972

70-
// Stats returns the current status for the work pool.
71-
func (w *Work) Stats() Stats {
72-
var s Stats
73-
s.Goroutines = atomic.LoadInt64(&w.stats.Goroutines)
74-
s.Pending = atomic.LoadInt64(&w.stats.Pending)
75-
s.Active = atomic.LoadInt64(&w.stats.Active)
73+
// Add the routines.
74+
w.Add(minRoutines)
7675

77-
return s
76+
return &w, nil
7877
}
7978

80-
// Add creates goroutines to process work or sets a count for
81-
// goroutines to terminate.
82-
func (w *Work) Add(goroutines int) {
83-
if goroutines == 0 {
79+
// Add creates routines to process work or sets a count for
80+
// routines to terminate.
81+
func (w *Work) Add(routines int) {
82+
if routines == 0 {
8483
return
8584
}
8685

87-
w.mutex.Lock()
88-
{
89-
if goroutines > 0 {
90-
// We are adding goroutines to the pool.
91-
w.wg.Add(goroutines)
92-
atomic.AddInt64(&w.stats.Goroutines, int64(goroutines))
93-
94-
for i := 0; i < goroutines; i++ {
95-
go w.work()
96-
}
97-
} else {
98-
// We are removing goroutines from the pool.
99-
goroutines = goroutines * -1
100-
current := int(atomic.LoadInt64(&w.stats.Goroutines))
101-
if goroutines > current {
102-
goroutines = current
103-
}
86+
cmd := addRoutine
87+
if routines < 0 {
88+
routines = routines * -1
89+
cmd = rmvRoutine
90+
}
10491

105-
// Send the kill signal and wait for these goroutines
106-
// to get the signal to die.
107-
for i := 0; i < goroutines; i++ {
108-
w.kill <- struct{}{}
109-
}
110-
}
92+
for i := 0; i < routines; i++ {
93+
w.control <- cmd
11194
}
112-
w.mutex.Unlock()
11395
}
11496

11597
// work performs the users work and keeps stats.
116-
func (w *Work) work() {
98+
func (w *Work) work(id int) {
11799
done:
118100
for {
119101
select {
120-
case t, ok := <-w.tasks:
121-
if !ok {
122-
break done
102+
case t := <-w.tasks:
103+
atomic.AddInt64(&w.active, 1)
104+
{
105+
// Perform the work.
106+
t.Work(id)
123107
}
124-
125-
atomic.AddInt64(&w.stats.Active, 1)
126-
t.Work()
127-
atomic.AddInt64(&w.stats.Active, -1)
108+
atomic.AddInt64(&w.active, -1)
128109

129110
case <-w.kill:
130111
break done
131112
}
132113
}
133114

134-
atomic.AddInt64(&w.stats.Goroutines, -1)
115+
// Decrement the counts.
116+
atomic.AddInt64(&w.routines, -1)
135117
w.wg.Done()
118+
119+
w.log("Worker : Shutting Down")
136120
}
137121

138122
// Run wait for the goroutine pool to take the work
139123
// to be executed.
140124
func (w *Work) Run(work Worker) {
141-
atomic.AddInt64(&w.stats.Pending, 1)
142-
w.tasks <- work
143-
atomic.AddInt64(&w.stats.Pending, -1)
125+
atomic.AddInt64(&w.pending, 1)
126+
{
127+
w.tasks <- work
128+
}
129+
atomic.AddInt64(&w.pending, -1)
144130
}
145131

146132
// Shutdown waits for all the workers to finish.
147133
func (w *Work) Shutdown() {
148-
close(w.tasks)
149134
close(w.shutdown)
150135
w.wg.Wait()
151136
}
137+
138+
// manager controls changes to the work pool including stats
139+
// and shutting down.
140+
func (w *Work) manager() {
141+
w.wg.Add(1)
142+
143+
go func() {
144+
w.log("Work Manager : Started")
145+
146+
// Create a timer to run stats.
147+
timer := time.NewTimer(w.statTime)
148+
149+
for {
150+
select {
151+
case <-w.shutdown:
152+
// Capture the current number of routines.
153+
routines := int(atomic.LoadInt64(&w.routines))
154+
155+
// Send a kill to all the existing routines.
156+
for i := 0; i < routines; i++ {
157+
w.kill <- struct{}{}
158+
}
159+
160+
// Decrement the waitgroup and kill the manager.
161+
w.wg.Done()
162+
return
163+
164+
case c := <-w.control:
165+
switch c {
166+
case addRoutine:
167+
w.log("Work Manager : Add Routine")
168+
169+
// Capture a unique id.
170+
w.counter++
171+
172+
// Add to the counts.
173+
w.wg.Add(1)
174+
atomic.AddInt64(&w.routines, 1)
175+
176+
// Create the routine.
177+
go w.work(w.counter)
178+
179+
case rmvRoutine:
180+
w.log("Work Manager : Remove Routine")
181+
182+
// Capture the number of routines.
183+
routines := int(atomic.LoadInt64(&w.routines))
184+
185+
// Are there routines to remove.
186+
if routines <= w.minRoutines {
187+
w.log("Work Manager : Reached Minimum Can't Remove")
188+
break
189+
}
190+
191+
// Send a kill signal to remove a routine.
192+
w.kill <- struct{}{}
193+
}
194+
195+
case <-timer.C:
196+
// Capture the stats.
197+
routines := atomic.LoadInt64(&w.routines)
198+
pending := atomic.LoadInt64(&w.pending)
199+
active := atomic.LoadInt64(&w.active)
200+
201+
// Display the stats.
202+
w.log(fmt.Sprintf("Work Manager : Stats : G[%d] P[%d] A[%d]", routines, pending, active))
203+
204+
// Reset the clock.
205+
timer.Reset(w.statTime)
206+
}
207+
}
208+
}()
209+
}
210+
211+
// log sending logging messages back to the client.
212+
func (w *Work) log(message string) {
213+
if w.logFunc != nil {
214+
w.logFunc(message)
215+
}
216+
}

0 commit comments

Comments
 (0)