WaitGroup是Go经常使用的并发控制工具,它非常适合等待一组goroutine结束的场景
wg的使用方法非常简单,主要的API只有Add()
,Wait()
和Done()
在下面的例子中,主goroutine等待两个子goroutine结束后才返回:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
| package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// 代表等待2个goroutine结束
wg.Add(2)
go func() {
// Do something
time.Sleep(time.Second)
fmt.Println("Goroutine 1 finished!")
// 通知WaitGroup本goroutine已完成
wg.Done()
}()
go func() {
// Do something
time.Sleep(time.Second)
fmt.Println("Goroutine 2 finished!")
// 通知WaitGroup本goroutine已完成
wg.Done()
}()
// 阻塞当前goroutine,所有等待的子goroutine完成后才返回
wg.Wait()
fmt.Println("All Goroutines finished")
}
|
运行结果:
// 完成顺序不一定相同
Goroutine 2 finished!
Goroutine 1 finished!
All Goroutines finished
WaitGroup的实现非常简单,主要是两个计数器和一个信号量(Semaphore):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| type WaitGroup struct {
noCopy noCopy
// 高32位是等待(调用Add的)计数器,低32位是父协程(调用Wait的)数量
// count 和 wait count
state1 uint64
// 信号量指针
state2 uint32
}
//通过state()获取上面的值
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return &wg.state1, &wg.state2
}
...
}
|
信号量是常见的类UNIX操作系统提供的保护线程间共享资源的工具,主要分为P
, V
两个操作,以下简称semap
信号量由Dijkstra提出,由于他是荷兰人,所以PV是荷兰语的缩写:
V stands for ‘Verhoog’, which can be translated as “increment”.
P stands for ‘Prolaag’, a madeup Dutch word of ‘probeer verlaag’, which can be translated as “try to decrease”.
See the naming in the original paper.
即V
代表增加,P
代表尝试减少
- semap = 0时,资源不可用,在此时获取资源线程会进入睡眠,在有资源可用时再唤醒线程
- semap > 0时,资源可用,此时获取资源会将信号量减一
值得注意的是,Done()
实际上就是Add(-1)
。
WaitGroup使用信号量实现了父子协程的唤醒功能,简单来说,父协程先调用Add(n)
指示需要等待多少个子协程完成,之后创建对应数量的子协程后调用Wait()
等待子协程完成:
- 当子协程还在运行时,
Wait()
会让父协程阻塞在获取资源那一步。 - 当所有子协程完成后,最后一个子协程会调用
Done()
,即Add(-1)
。在Add()
的实现中,当所有子协程已完成时,会释放对应调用Wait()
的数量的资源,即让所有Wait()
的父协程继续运行,父协程此时就能从Wait()
返回并继续运行。
以下是节选的实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
| func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 下面的if是竞争状态检测
if race.Enabled {
_ = *statep
if delta < 0 {
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
state := atomic.AddUint64(statep, uint64(delta)<<32) // 增加等待数量
v := int32(state >> 32) // 等待的子协程数
w := uint32(state) // 阻塞的父协程数
// 一些错误检测
if race.Enabled && delta > 0 && v == int32(delta) {
race.Read(unsafe.Pointer(semap))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 当还有子协程在运行或没有父协程需要唤醒时直接返回
if v > 0 || w == 0 {
return
}
// 跑到这里的时候v必定等于0且有需要唤醒的父协程,即所有子协程已经运行完毕,需要唤醒父协程继续运行
// 错误检测
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 初始化count和wait count
*statep = 0
// 唤醒所有父协程
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
func (wg *WaitGroup) Wait() {
// 获取数量和信号量指针
statep, semap := wg.state()
// 竞争状态检测
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
// 如果没有需要等待的子协程,直接返回
if v == 0 {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 增加父协程数量
if atomic.CompareAndSwapUint64(statep, state, state+1) {
// 竞争状态检测
if race.Enabled && w == 0 {
race.Write(unsafe.Pointer(semap))
}
// 阻塞,等待子协程结束并获取资源
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
// 获取资源后返回
return
}
}
}
|
以下是一些WaitGroup的注意事项:
- A WaitGroup must not be copied after first use.
- Add must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time.
- New Add calls must happen after all previous Wait calls have returned.
- Adds must not happen concurrently with Wait
参考资料: