type WaitGroup struct 🚀

  1. WaitGroup 等待 goroutine 集合完成。
  2. main goroutine 调用 Add 来设置要等待的 goroutine 的数量。然后每个 goroutine 运行并在完成时调用 Done。
  3. 同时,可以使用 Wait 来阻塞,直到所有 goroutine 完成。
  4. WaitGroup 在第一次使用后不能被复制。
  5. WaitGroup:用于等待一组线程的结束,父线程调用Add方法来设定应等待的线程的数量。
  6. 每个被等待的线程在结束时应调用Done方法,同时,主线程里可以调用Wait方法阻塞至所有线程结束。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
    // WaitGroup 首次使用后不能被拷贝的原因【是 &state2 地址会发生变化】
    // semaPhore 要求 &state2 地址是一个,不然从其中唤醒 goroutine 会找不到
    noCopy noCopy	// 编译器检查WaitGroup对象是否被拷贝过

    // 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
    // 64-bit atomic operations require 64-bit alignment, but 32-bit
    // compilers only guarantee that 64-bit fields are 32-bit aligned.
    // For this reason on 32 bit architectures we need to check in state()
    // if state1 is aligned or not, and dynamically "swap" the field order if
    // needed.
    //
    // 64位值: 高32位是 counter 计数,低32位是 waiter 计数。
    // 64-bit = ( uint64(counter) << 32 ) | uint32( waiter )
    // 【64位原子操作需要64位对齐】,但是32位编译器只保证64位字段是32位对齐的。
    // 因此,在32位架构上,我们需要检查在state()是否将state1对齐,并在需要时动态 “交换” 字段顺序。
    // 意思是:在32位操作系统下使用64位原子操作时,被操作地址必须是64位对齐的,不然会宕机。
    state1 uint64	// (uint64(counter) << 32) | uint32(waiter)
    state2 uint32
}

type noCopy struct

  1. noCopy 可以嵌入到第一次使用后不得复制的结构中.
  2. sync/cond.go文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

// Lock is a no-op used by -copylocks checker from `go vet`.
//
// Lock 是 `go vet` 的 -copylocks 检查器使用的无操作
func (*noCopy) Lock()   {}
func (*noCopy) Unlock() {}

WaitGroup 结构布局

  1. Semaphore:信号量用于挂起 Wait 函数的调用者的 goroutine。(需要一个 uint32 类型)
  2. Counter:等待的运行的 goroutine 数量,该值在 Add 和 Done 函数中被操作。
  3. Waiter:等待在 Semaphore 中的 goroutine 数量。
  4. 为什么需要这么设计呢?因为 Waiter + Counter 是一个整体作为64位,被原子操作,而64位原子操作又要求必须是64位对齐的。(具体参看state()源码)
  5. 在1.22版本中,state1也就是Waiter和Counter使用atomic.Uint64替代了。以下图是1.18前版本的。

Sema、Counter、Waiter

  1. WaitGroup 结构设计目的是等待集合中的goroutine完成,因此 main goroutine 就是产生这个等待集合的,它等待集合中的所有goroutine完成再继续后续。
  2. Counter 则是计数当前等待集合中的goroutine的数量,一个goroutine被创建放入集合时就应该计数Counter值,一个goroutine完成时也应该计数Counter。
  3. 因此goroutine被加入到等待集合中都是在main goroutine中操作包括Counter的计数,一个goroutine完成计数Counter应该在这goroutine完成时操作。
  4. Semaphore 则是 main goroutine 在调用Wait()函数时,main goroutine需要等待等待集合goroutine完成而挂起在Semaphore池子里。
  5. Waiter 则是记录Semaphore中等待的goroutine的数量。
  6. Add()Done() 函数是计数Counter的相关方法,在等待集合goroutine中最后一个goroutine完成时,如果有等待在Semaphore的goroutine应该全部唤醒。
  7. Wait() 函数是等待等待集合goroutine完成,main goroutine主动挂起自己的相关逻辑。

state()

  1. State 返回指向存储在 wg.state* 中的 State 和 sema 字段的指针。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// state returns pointers to the state and sema fields stored within wg.state*.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
    // unsafe.Alignof(wg.state1) == 8:说明state1字段对齐为8字节,这种情况是64位平台
    // uintptr(unsafe.Pointer(&wg.state1))%8 == 0:说明wg.state1的地址按照8字节对齐的,可能是64或32平台
    if unsafe.Alignof(wg.state1) == 8 || uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        // state1 is 64-bit aligned: nothing to do.
        // state1 是 64 位对齐的:无事可做
        return &wg.state1, &wg.state2	// state1是64位对齐,state1就是state,state2就是sema
    } else {
        // state1 is 32-bit aligned but not 64-bit aligned: this means that
        // (&state1)+4 is 64-bit aligned.
        // 
        // State1是32位对齐,而不是64位对齐:这意味着 (&state1)+4 是64位对齐。
        // 这种情况处理就是为了满足后续:64位原子操作需要64位对齐
        state := (*[3]uint32)(unsafe.Pointer(&wg.state1))	
        return (*uint64)(unsafe.Pointer(&state[1])), &state[0]
    }
}

Add()

  1. Add 将 delta (可能是负数)添加到 WaitGroup counter。
  2. 如果 counter 变为0,所有在等待时被阻塞的 goroutine 都会被释放。如果 counter 变为负数,则会 painc。
  3. 请注意,当 counter 为0时,delta为正的调用必须发生在等待之前。
  4. 使用负的delta调用,或者从 counter 大于零开始使用正的delta调用,都可能在任何时候发生。
  5. 通常,这意味着对Add的调用应该在创建goroutine语句或其他要等待的事件之前执行。
  6. 如果重用一个 WaitGroup 来等待几个独立的事件集,那么新的Add调用必须在所有先前的wait调用都返回之后发生。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
    // 1) 原子操作 statep += delta
    
    // statep *uint64:【Waiter + Counter】
    // semap *uint32:【semaphore】
    // *statep = (uint64(counter) << 32) | uint32(waiter)
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        if delta < 0 {
            // Synchronize decrements with Wait.
            race.ReleaseMerge(unsafe.Pointer(wg))
        }
        race.Disable()
        defer race.Enable()
    }
    
    // 原子操作 {counter += delta; state = counter;} delta 可能为负数
    // 这里的64位原子操作也就是为什么需要 state 函数的原因
    state := atomic.AddUint64(statep, uint64(delta)<<32) // counter += delta
    // 注意这里是 int32 类型,原因是 delta 是来自用户传入,可能最后导致 v < 0 情况发生
    v := int32(state >> 32)	 // counter 高32位
    w := uint32(state)		 // waiter 低32位
    
    if race.Enabled && delta > 0 && v == int32(delta) {
        // The first increment must be synchronized with Wait.
        // Need to model this as a read, because there can be
        // several concurrent wg.counter transitions from 0.
        race.Read(unsafe.Pointer(semap))
    }
    
    // 2) counter 不能为负数
    
    // counter 不应该出现为负数情况
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    
    // 3) Add(>0) 和 Wait 函数不能并发调用
    
    // Add(>0) 和 Wait() 函数并发被调用
    //  1. w != 0 			存在等待的waiter
    //  2. delta > 0 		本次调用是添加不是减少
    //  3. v == int32(delta) 当前添加的数量就是总数量,之前为0
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    // 4)  if ( v > 0 || (v == 0 && w == 0) ) return
    
    // Counter计数数量大于零 或 没有等待的waiter直接返回
    //  1. v > 0:代表不是最后一个,因此直接返回
    //  2. v == 0 && w == 0:也是直接返回
    if v > 0 || w == 0 {	// 这里的条件比较关键
        return
    }
    
    // 5) v == 0 && w > 0
    // 这种情况需要去把 semap 上面挂起的 goroutine 全部唤醒
    
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // 
    // 当 waiters > 0 时,此 goroutine 将 counter 设置为 0
    // 现在不能有并发的 state 突变:
    // 	- Add 不能与 Wait 同时发生
    //  - 如果看到 counter == 0,Wait 不会增加 waiter
    // 仍然要做一个廉价的健全检查来检测WaitGroup的滥用。
    //
    // WaitGroup误用:Add与Wait并发调用
    // 这种情况发生在:最后一个goroutine运行完需要唤醒等待的waiter此时Add方法有被调用
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    
    // Reset waiters count to 0.
    *statep = 0	// 重置 Counter = 0,Waiter = 0
    for ; w != 0; w-- {
        // false正常模式
        // 将等待在 semaphore 中的goroutine取出等待调度
        runtime_Semrelease(semap, false, 0)
    }
}

Done()

  1. Done 将 WaitGroup counter 减1。
1
2
3
4
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

Wait()

  1. Wait 阻塞,直到 WaitGroup counter 为0。
  2. Wait 函数的调用期间,可能处于多个goroutine在调用 Done 函数。
  3. Wait() 函数允许被多个线程同时调用。Wait() 函数一定要所有的Counter都标记完毕后才调用该方法
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    // statep *uint64:【Waiter + Counter】
    // semap *uint32:【semaphore】
    // *statep = (uint64(counter) << 32) | uint32(waiter)
    statep, semap := wg.state()
    if race.Enabled {
        _ = *statep // trigger nil deref early
        race.Disable()
    }
    
    for {
        state := atomic.LoadUint64(statep)	// 原子读取 statep
        v := int32(state >> 32)		// counter
        w := uint32(state)			// waiter
        
        // 这种情况,比如在调用Wait函数时先调用sleep睡眠很长一段时间
        // 也就是 Wait 函数还没开始执行 其他goroutine 已经执行完了,因此直接返回即可
        if v == 0 {	
            // Counter is 0, no need to wait.
            // 
            // 计数器为 0,无需等待
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        
        // Increment waiters count.
        // 
        // 增加waiter数量,正常逻辑下在这里等待的只有main goroutine一个
        // 这里可能会失败,可能有很多goroutine正在调用Add或Done方法修改Counter导致这里原子操作失败
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(semap))
            }
            // 该方法最后会调用semacquire1,我们在sync.Mutex中已经讨论过
            // 这里会把当前goroutine入队,注意这里入队的是main goroutine
            // runtime_Semacquire 将当前 goroutine 加入到 semaphore 的尾部
            runtime_Semacquire(semap)	// 主线程在这里被调离工作线程,下次恢复时从这里接到执行
            
            // 当前goroutine被Done函数唤醒时,一定是 *statep == 0,不然流程有问题
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            
            return	// 直接返回到调用Wait方法位置处
        }
    }
}

使用示例

  1. WaitGroup:用于线程总同步,它等待一组线程集合完成,才会继续向下执行。
  2. 主线程调用 Add() 方法来设置等待的协程数量:
    • 然后每个协程运行,并在完成后调用 Done() 方法,Add(-1)Done() 效果一致,都表示等到的协程数量减少一个。
    • 同时,Wait() 方法用来阻塞主线程,直到所有协程完成才会向下执行。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    
    wg.Add(10)	// (&wg).Add(10) -> (*WaitGroup).Add(&wg, 10)
    
    for i := 0; i < 10; i++ {
        //wg.Add(1)	// (&wg).Add(1)	-> (*WaitGroup).Add(&wg, 1)
        go func(i int) {
            defer wg.Done()	// (&wg).Done() -> (*WaitGroup).Done(&wg)
            fmt.Println(i)
        }(i)
    }
    
    fmt.Println("我在循环外")
    
    // 阻塞主线程,等所有协程完成
    wg.Wait()	// (&wg).Wait() -> (*WaitGroup).Wait(&wg)
    
    // Output:
    // 1
    // 9
    // 3
    // 4
    // 我在循环外
    // 5
    // 6
    // 7
    // 8
    // 2
    // 0
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
    "fmt"
    "net/http"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    var urls []string = []string{
        "http://baidu.com/",
        "https://wzapi.myzx.cn/",
    }

    wg.Add(len(urls))
    
    for _, url := range urls {
        // wg.Add(1)
        
        go func(url string) {
            defer wg.Done()
            response, err := http.Get(url)
            fmt.Println(response, err, url)
        }(url)
    }

    wg.Wait()
    fmt.Println("over")
    
    // Output:
    // &{200 OK 200 HTTP/1.1 1 1 map[Accept-Ranges:[bytes] Cache-Control:[max-age=86400] Connection:[Keep-Alive] Content-Length:[81] Content-Type:[text/html] Date:[Thu, 22 Apr 2021 03:19:50 GMT] Etag:["51-47cf7e6ee8400"] Expires:[Fri, 23 A
    // pr 2021 03:19:50 GMT] Last-Modified:[Tue, 12 Jan 2010 13:48:00 GMT] Server:[Apache]] 0xc00003a140 81 [] false false map[] 0xc000044000 <nil>} <nil> http://baidu.com/
    // &{200 OK 200 HTTP/1.1 1 1 map[Access-Control-Allow-Headers:[Origin, X-Requested-With, Content-Type, Acceptfecshop-uuid, fecshop-lang, fecshop-currency, access-token, x-token, authorization] Access-Control-Allow-Methods:[*] Access-Co
    // ntrol-Allow-Origin:[*] Cache-Control:[no-cache, private] Connection:[keep-alive] Content-Type:[text/html; charset=UTF-8] Date:[Thu, 22 Apr 2021 03:19:50 GMT] Etag:[W/"be34c7da7adc79dfee6c76195ba1dbdad7b5bc9b"] Server:[MYBWS/1.1] Set
    // -Cookie:[acw_tc=2760829816190615907855703e97def055d99950b742716bbf6b2d8e6e2288;path=/;HttpOnly;Max-Age=1800 XSRF-TOKEN=eyJpdiI6IjFPZm9TcTYzblc5c0JJMFdSSW5EZ0E9PSIsInZhbHVlIjoiTXhEbHNlbVRXelMwTnR4UE5nY1JsNTRDTEJ4SmUzaFFsQkZTak9nTEtib
    // m5rczF3VlF6bVE1YitjNU5EbzlXMms4bTZLV0RiRTk4WXZMSFBBMFoxQ0V0OUpuYWxwN1ppYmpydjFFUzRQWXVBbktaNW82dFNVXC9BXC9FOG9Qb094VSIsIm1hYyI6IjNmYTMyZDUyYzk0ZWY0ZjJkNzJjOGY3M2FiYWYzZDYwNTA4YjFmZTBiZTljMzI1ZTI1MzY1MGQyZDAxYjQwN2QifQ%3D%3D; expires
    // =Thu, 22-Apr-2021 05:19:50 GMT; Max-Age=7200; path=/ laravel_session=eyJpdiI6IlA2cXcxRHVWYVgxa3VZRnNFY09LbkE9PSIsInZhbHVlIjoiK1ptMGIwOTdZbGp5dlwvRlNMK2pWb3hLVVErV0wxOFBQSmp1dVRkWExcL1VZbm1zVFwvUmRGT0dZcXJlcTZSRmhoWk1hdkJxYU9kUUFrNjB
    // RK3o4cW5TanZVZXZSYjVEN29CNUU2bEVHdHVoUVVESkhJcG1ETDVCS3FSTmtwYkJTcDciLCJtYWMiOiIwMTM3NDg2NTJlNDJkMGFhN2Y0NDA2YzJhYzcyMTQ4MzY1NDU1YzlmMjYwOTUxNjM0ZDJkYjUzYWJmMmMxZTE1In0%3D; expires=Thu, 22-Apr-2021 05:19:50 GMT; Max-Age=7200; path=/
    // ; httponly]] 0xc0001220a0 -1 [chunked] false true map[] 0xc00010a000 0xc00004e0b0} <nil> https://wzapi.myzx.cn/
    // over
}

总结

  1. sync.WaitGroup 更多是 多个goroutine通知一个goroutine,更像是main goroutine等待集合的所有goroutine完成一项任务,多对一
  2. sync.Cond 则更像多个goroutine等待main goroutine的工作完成,一对多,更像是广播形式

使用注意

  1. 以下使用方式是不正确的。
package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    
    // 【不能另外启动协程去执行Add()和Done(),因为main的Wait()不会阻塞等待】
    go func() {
        for i := 0; i < 10; i++ {
            wg.Add(1)	// (&wg).Add(1)	-> (*WaitGroup).Add(&wg, 1)
            go func(i int) {
                defer wg.Done()	// (&wg).Done() -> (*WaitGroup).Done(&wg)
                fmt.Println(i)
            }(i)
        }
    }()
    
    // 这里并不会阻塞等待,因为计数器为0
    wg.Wait()
    
    // Output:
}