Skip to content

Commit ee6e1a3

Browse files
dvyukovrsc
authored andcommitted
sync: add fast paths to WaitGroup
benchmark old ns/op new ns/op delta BenchmarkWaitGroupUncontended 93.50 33.60 -64.06% BenchmarkWaitGroupUncontended-2 44.30 16.90 -61.85% BenchmarkWaitGroupUncontended-4 21.80 8.47 -61.15% BenchmarkWaitGroupUncontended-8 12.10 4.86 -59.83% BenchmarkWaitGroupUncontended-16 7.38 3.35 -54.61% BenchmarkWaitGroupAddDone 58.40 33.70 -42.29% BenchmarkWaitGroupAddDone-2 293.00 85.80 -70.72% BenchmarkWaitGroupAddDone-4 243.00 51.10 -78.97% BenchmarkWaitGroupAddDone-8 236.00 52.20 -77.88% BenchmarkWaitGroupAddDone-16 215.00 43.30 -79.86% BenchmarkWaitGroupAddDoneWork 826.00 794.00 -3.87% BenchmarkWaitGroupAddDoneWork-2 450.00 424.00 -5.78% BenchmarkWaitGroupAddDoneWork-4 277.00 220.00 -20.58% BenchmarkWaitGroupAddDoneWork-8 440.00 116.00 -73.64% BenchmarkWaitGroupAddDoneWork-16 569.00 66.50 -88.31% BenchmarkWaitGroupWait 29.00 8.04 -72.28% BenchmarkWaitGroupWait-2 74.10 4.15 -94.40% BenchmarkWaitGroupWait-4 117.00 2.30 -98.03% BenchmarkWaitGroupWait-8 111.00 1.31 -98.82% BenchmarkWaitGroupWait-16 104.00 1.27 -98.78% BenchmarkWaitGroupWaitWork 802.00 792.00 -1.25% BenchmarkWaitGroupWaitWork-2 411.00 401.00 -2.43% BenchmarkWaitGroupWaitWork-4 210.00 199.00 -5.24% BenchmarkWaitGroupWaitWork-8 206.00 105.00 -49.03% BenchmarkWaitGroupWaitWork-16 334.00 54.40 -83.71% R=rsc CC=golang-dev https://golang.org/cl/4672050
1 parent 92c6061 commit ee6e1a3

File tree

7 files changed

+271
-15
lines changed

7 files changed

+271
-15
lines changed

src/pkg/sync/atomic/asm_386.s

+9
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,12 @@ addloop:
8585
MOVL BX, retlo+12(FP)
8686
MOVL CX, rethi+16(FP)
8787
RET
88+
89+
TEXT ·LoadInt32(SB),7,$0
90+
JMP ·LoadUint32(SB)
91+
92+
TEXT ·LoadUint32(SB),7,$0
93+
MOVL addrptr+0(FP), AX
94+
MOVL 0(AX), AX
95+
MOVL AX, ret+4(FP)
96+
RET

src/pkg/sync/atomic/asm_amd64.s

+10
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,13 @@ TEXT ·AddUint64(SB),7,$0
5757
ADDQ AX, CX
5858
MOVQ CX, ret+16(FP)
5959
RET
60+
61+
TEXT ·LoadInt32(SB),7,$0
62+
JMP ·LoadUint32(SB)
63+
64+
TEXT ·LoadUint32(SB),7,$0
65+
MOVQ addrptr+0(FP), AX
66+
MOVL 0(AX), AX
67+
MOVL AX, ret+8(FP)
68+
RET
69+

src/pkg/sync/atomic/asm_linux_arm.s

+13
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,16 @@ TEXT ·AddInt64(SB),7,$0
8383

8484
TEXT ·AddUint64(SB),7,$0
8585
B ·armAddUint64(SB)
86+
87+
TEXT ·LoadInt32(SB),7,$0
88+
B ·LoadUint32(SB)
89+
90+
TEXT ·LoadUint32(SB),7,$0
91+
MOVW addrptr+0(FP), R2
92+
loadloop1:
93+
MOVW 0(R2), R0
94+
MOVW R0, R1
95+
BL cas<>(SB)
96+
BCC loadloop1
97+
MOVW R0, val+4(FP)
98+
RET

src/pkg/sync/atomic/atomic_test.go

+102
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) {
308308
}
309309
}
310310

311+
func TestLoadInt32(t *testing.T) {
312+
var x struct {
313+
before int32
314+
i int32
315+
after int32
316+
}
317+
x.before = magic32
318+
x.after = magic32
319+
for delta := int32(1); delta+delta > delta; delta += delta {
320+
k := LoadInt32(&x.i)
321+
if k != x.i {
322+
t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
323+
}
324+
x.i += delta
325+
}
326+
if x.before != magic32 || x.after != magic32 {
327+
t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
328+
}
329+
}
330+
331+
func TestLoadUint32(t *testing.T) {
332+
var x struct {
333+
before uint32
334+
i uint32
335+
after uint32
336+
}
337+
x.before = magic32
338+
x.after = magic32
339+
for delta := uint32(1); delta+delta > delta; delta += delta {
340+
k := LoadUint32(&x.i)
341+
if k != x.i {
342+
t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
343+
}
344+
x.i += delta
345+
}
346+
if x.before != magic32 || x.after != magic32 {
347+
t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
348+
}
349+
}
350+
311351
// Tests of correct behavior, with contention.
312352
// (Is the function atomic?)
313353
//
@@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) {
537577
}
538578
}
539579
}
580+
581+
func hammerLoadInt32(t *testing.T, uval *uint32) {
582+
val := (*int32)(unsafe.Pointer(uval))
583+
for {
584+
v := LoadInt32(val)
585+
vlo := v & ((1 << 16) - 1)
586+
vhi := v >> 16
587+
if vlo != vhi {
588+
t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi)
589+
}
590+
new := v + 1 + 1<<16
591+
if vlo == 1e4 {
592+
new = 0
593+
}
594+
if CompareAndSwapInt32(val, v, new) {
595+
break
596+
}
597+
}
598+
}
599+
600+
func hammerLoadUint32(t *testing.T, val *uint32) {
601+
for {
602+
v := LoadUint32(val)
603+
vlo := v & ((1 << 16) - 1)
604+
vhi := v >> 16
605+
if vlo != vhi {
606+
t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi)
607+
}
608+
new := v + 1 + 1<<16
609+
if vlo == 1e4 {
610+
new = 0
611+
}
612+
if CompareAndSwapUint32(val, v, new) {
613+
break
614+
}
615+
}
616+
}
617+
618+
func TestHammerLoad(t *testing.T) {
619+
tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32}
620+
n := 100000
621+
if testing.Short() {
622+
n = 10000
623+
}
624+
const procs = 8
625+
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs))
626+
for _, tt := range tests {
627+
c := make(chan int)
628+
var val uint32
629+
for p := 0; p < procs; p++ {
630+
go func() {
631+
for i := 0; i < n; i++ {
632+
tt(t, &val)
633+
}
634+
c <- 1
635+
}()
636+
}
637+
for p := 0; p < procs; p++ {
638+
<-c
639+
}
640+
}
641+
}

src/pkg/sync/atomic/doc.go

+6
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64)
5656
// AddUintptr atomically adds delta to *val and returns the new value.
5757
func AddUintptr(val *uintptr, delta uintptr) (new uintptr)
5858

59+
// LoadInt32 atomically loads *addr.
60+
func LoadInt32(addr *int32) (val int32)
61+
62+
// LoadUint32 atomically loads *addr.
63+
func LoadUint32(addr *uint32) (val uint32)
64+
5965
// Helper for ARM. Linker will discard on other systems
6066
func panic64() {
6167
panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)")

src/pkg/sync/waitgroup.go

+26-15
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44

55
package sync
66

7-
import "runtime"
7+
import (
8+
"runtime"
9+
"sync/atomic"
10+
)
811

912
// A WaitGroup waits for a collection of goroutines to finish.
1013
// The main goroutine calls Add to set the number of
@@ -28,8 +31,8 @@ import "runtime"
2831
//
2932
type WaitGroup struct {
3033
m Mutex
31-
counter int
32-
waiters int
34+
counter int32
35+
waiters int32
3336
sema *uint32
3437
}
3538

@@ -48,19 +51,19 @@ type WaitGroup struct {
4851
// Add adds delta, which may be negative, to the WaitGroup counter.
4952
// If the counter becomes zero, all goroutines blocked on Wait() are released.
5053
func (wg *WaitGroup) Add(delta int) {
51-
wg.m.Lock()
52-
if delta < -wg.counter {
53-
wg.m.Unlock()
54+
v := atomic.AddInt32(&wg.counter, int32(delta))
55+
if v < 0 {
5456
panic("sync: negative WaitGroup count")
5557
}
56-
wg.counter += delta
57-
if wg.counter == 0 && wg.waiters > 0 {
58-
for i := 0; i < wg.waiters; i++ {
59-
runtime.Semrelease(wg.sema)
60-
}
61-
wg.waiters = 0
62-
wg.sema = nil
58+
if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
59+
return
6360
}
61+
wg.m.Lock()
62+
for i := int32(0); i < wg.waiters; i++ {
63+
runtime.Semrelease(wg.sema)
64+
}
65+
wg.waiters = 0
66+
wg.sema = nil
6467
wg.m.Unlock()
6568
}
6669

@@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() {
7174

7275
// Wait blocks until the WaitGroup counter is zero.
7376
func (wg *WaitGroup) Wait() {
77+
if atomic.LoadInt32(&wg.counter) == 0 {
78+
return
79+
}
7480
wg.m.Lock()
75-
if wg.counter == 0 {
81+
atomic.AddInt32(&wg.waiters, 1)
82+
// This code is racing with the unlocked path in Add above.
83+
// The code above modifies counter and then reads waiters.
84+
// We must modify waiters and then read counter (the opposite order)
85+
// to avoid missing an Add.
86+
if atomic.LoadInt32(&wg.counter) == 0 {
87+
atomic.AddInt32(&wg.waiters, -1)
7688
wg.m.Unlock()
7789
return
7890
}
79-
wg.waiters++
8091
if wg.sema == nil {
8192
wg.sema = new(uint32)
8293
}

src/pkg/sync/waitgroup_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
package sync_test
66

77
import (
8+
"runtime"
89
. "sync"
10+
"sync/atomic"
911
"testing"
1012
)
1113

@@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) {
5860
wg.Done()
5961
t.Fatal("Should panic")
6062
}
63+
64+
func BenchmarkWaitGroupUncontended(b *testing.B) {
65+
type PaddedWaitGroup struct {
66+
WaitGroup
67+
pad [128]uint8
68+
}
69+
const CallsPerSched = 1000
70+
procs := runtime.GOMAXPROCS(-1)
71+
N := int32(b.N / CallsPerSched)
72+
c := make(chan bool, procs)
73+
for p := 0; p < procs; p++ {
74+
go func() {
75+
var wg PaddedWaitGroup
76+
for atomic.AddInt32(&N, -1) >= 0 {
77+
runtime.Gosched()
78+
for g := 0; g < CallsPerSched; g++ {
79+
wg.Add(1)
80+
wg.Done()
81+
wg.Wait()
82+
}
83+
}
84+
c <- true
85+
}()
86+
}
87+
for p := 0; p < procs; p++ {
88+
<-c
89+
}
90+
}
91+
92+
func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
93+
const CallsPerSched = 1000
94+
procs := runtime.GOMAXPROCS(-1)
95+
N := int32(b.N / CallsPerSched)
96+
c := make(chan bool, procs)
97+
var wg WaitGroup
98+
for p := 0; p < procs; p++ {
99+
go func() {
100+
foo := 0
101+
for atomic.AddInt32(&N, -1) >= 0 {
102+
runtime.Gosched()
103+
for g := 0; g < CallsPerSched; g++ {
104+
wg.Add(1)
105+
for i := 0; i < localWork; i++ {
106+
foo *= 2
107+
foo /= 2
108+
}
109+
wg.Done()
110+
}
111+
}
112+
c <- foo == 42
113+
}()
114+
}
115+
for p := 0; p < procs; p++ {
116+
<-c
117+
}
118+
}
119+
120+
func BenchmarkWaitGroupAddDone(b *testing.B) {
121+
benchmarkWaitGroupAddDone(b, 0)
122+
}
123+
124+
func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
125+
benchmarkWaitGroupAddDone(b, 100)
126+
}
127+
128+
func benchmarkWaitGroupWait(b *testing.B, localWork int) {
129+
const CallsPerSched = 1000
130+
procs := runtime.GOMAXPROCS(-1)
131+
N := int32(b.N / CallsPerSched)
132+
c := make(chan bool, procs)
133+
var wg WaitGroup
134+
wg.Add(procs)
135+
for p := 0; p < procs; p++ {
136+
go wg.Done()
137+
}
138+
for p := 0; p < procs; p++ {
139+
go func() {
140+
foo := 0
141+
for atomic.AddInt32(&N, -1) >= 0 {
142+
runtime.Gosched()
143+
for g := 0; g < CallsPerSched; g++ {
144+
wg.Wait()
145+
for i := 0; i < localWork; i++ {
146+
foo *= 2
147+
foo /= 2
148+
}
149+
}
150+
}
151+
c <- foo == 42
152+
}()
153+
}
154+
for p := 0; p < procs; p++ {
155+
<-c
156+
}
157+
}
158+
159+
func BenchmarkWaitGroupWait(b *testing.B) {
160+
benchmarkWaitGroupWait(b, 0)
161+
}
162+
163+
func BenchmarkWaitGroupWaitWork(b *testing.B) {
164+
benchmarkWaitGroupWait(b, 100)
165+
}

0 commit comments

Comments
 (0)