goroutine控制:WaitGroup解析

最后编辑于 2024-02-29

WaitGroup是Go经常使用的并发控制工具,它非常适合等待一组goroutine结束的场景

wg的使用方法非常简单,主要的API只有Add()Wait()Done()

在下面的例子中,主goroutine等待两个子goroutine结束后才返回:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup

	// 代表等待2个goroutine结束
	wg.Add(2)

	go func() {
		// Do something
		time.Sleep(time.Second)

		fmt.Println("Goroutine 1 finished!")

		// 通知WaitGroup本goroutine已完成
		wg.Done()
	}()

	go func() {
		// Do something
		time.Sleep(time.Second)

		fmt.Println("Goroutine 2 finished!")
		// 通知WaitGroup本goroutine已完成
		wg.Done()
	}()

	// 阻塞当前goroutine,所有等待的子goroutine完成后才返回
	wg.Wait()
	fmt.Println("All Goroutines finished")
}

运行结果:

// 完成顺序不一定相同
Goroutine 2 finished!
Goroutine 1 finished!
All Goroutines finished

WaitGroup的实现非常简单,主要是两个计数器和一个信号量(Semaphore):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type WaitGroup struct {
	noCopy noCopy

	// 高32位是等待(调用Add的)计数器,低32位是父协程(调用Wait的)数量
	// count 和 wait count
	state1 uint64
	// 信号量指针
	state2 uint32
}

//通过state()获取上面的值
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
	if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
		return &wg.state1, &wg.state2
	}
	...
}

信号量是常见的类UNIX操作系统提供的保护线程间共享资源的工具,主要分为P, V两个操作,以下简称semap

信号量由Dijkstra提出,由于他是荷兰人,所以PV是荷兰语的缩写:

V stands for ‘Verhoog’, which can be translated as “increment”.

P stands for ‘Prolaag’, a madeup Dutch word of ‘probeer verlaag’, which can be translated as “try to decrease”.

See the naming in the original paper.

V代表增加,P代表尝试减少

  • semap = 0时,资源不可用,在此时获取资源线程会进入睡眠,在有资源可用时再唤醒线程
  • semap > 0时,资源可用,此时获取资源会将信号量减一

值得注意的是,Done()实际上就是Add(-1)

WaitGroup使用信号量实现了父子协程的唤醒功能,简单来说,父协程先调用Add(n)指示需要等待多少个子协程完成,之后创建对应数量的子协程后调用Wait()等待子协程完成:

  • 当子协程还在运行时,Wait()会让父协程阻塞在获取资源那一步。
  • 当所有子协程完成后,最后一个子协程会调用Done(),即Add(-1)。在Add()的实现中,当所有子协程已完成时,会释放对应调用Wait()的数量的资源,即让所有Wait()的父协程继续运行,父协程此时就能从Wait()返回并继续运行。

以下是节选的实现代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
func (wg *WaitGroup) Add(delta int) {
	statep, semap := wg.state()
	// 下面的if是竞争状态检测
	if race.Enabled {
		_ = *statep
		if delta < 0 {
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}

	state := atomic.AddUint64(statep, uint64(delta)<<32) // 增加等待数量
	v := int32(state >> 32) // 等待的子协程数
	w := uint32(state)      // 阻塞的父协程数

	// 一些错误检测
	if race.Enabled && delta > 0 && v == int32(delta) {
		race.Read(unsafe.Pointer(semap))
	}
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

	// 当还有子协程在运行或没有父协程需要唤醒时直接返回
	if v > 0 || w == 0 {
		return
	}

	// 跑到这里的时候v必定等于0且有需要唤醒的父协程,即所有子协程已经运行完毕,需要唤醒父协程继续运行
	// 错误检测
	if *statep != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}
	// 初始化count和wait count
	*statep = 0
	// 唤醒所有父协程
	for ; w != 0; w-- {
		runtime_Semrelease(semap, false, 0)
	}

}

func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

func (wg *WaitGroup) Wait() {
	// 获取数量和信号量指针
	statep, semap := wg.state()
	// 竞争状态检测
	if race.Enabled {
		_ = *statep // trigger nil deref early
		race.Disable()
	}
	for {
		state := atomic.LoadUint64(statep)
		v := int32(state >> 32)
		w := uint32(state)
		// 如果没有需要等待的子协程,直接返回
		if v == 0 {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// 增加父协程数量
		if atomic.CompareAndSwapUint64(statep, state, state+1) {
			// 竞争状态检测
			if race.Enabled && w == 0 {
				race.Write(unsafe.Pointer(semap))
			}
			// 阻塞,等待子协程结束并获取资源
			runtime_Semacquire(semap)
			if *statep != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			// 获取资源后返回
			return
		}
	}
}

以下是一些WaitGroup的注意事项:

  • A WaitGroup must not be copied after first use.
  • Add must happen before a Wait. Calls with a negative delta, or calls with a positive delta that start when the counter is greater than zero, may happen at any time.
  • New Add calls must happen after all previous Wait calls have returned.
  • Adds must not happen concurrently with Wait

参考资料: