WaitGroup
從源碼分析 wait group
介紹WaitGroup
使用WaitGroup的三個重要方法
- Add(delta int) 給WaitGroup的計數器增加一個值,當WaitGroup的計數值減少到零時(這邊的計數是Wait的不是Add),阻塞在Wait上的Goroutine就不會阻塞,但計數器為負數時就會panic
- Done() 表示一個goroutine完成任務,WaitGroup減少1 (這邊是Add不是Wait)
- Wait() 此方法調用者會被阻塞,直到WaitGroup的計數值減小到0
WaitGroup標準使用
避免意外觸發panic
package main
import (
"fmt"
"sync"
)
func main(){
var wg sync.WaitGroup
wg.Add(3) //第一步
for i := 0;i<3;i++{
go func () {
defer wg.Done() //第二步,在goroutine內加上這個調用
...
}()
}
wg.Wait() //第三步,在主goroutine內調用Wait
}
分析WaitGroup結構
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // 高32位為計數器的值,低32位為waiter的數量
sema uint32 // 信號量
- noCopy為輔助字段,主要用於輔助檢查工具vet
- 使用atomic.Uint64保證64位對齊,所以state能夠紀錄計數器的值和waiter的數量
- sema 用來喚醒waiter的信號量
分析 noCopy字段
type noCopy struct{}
// Lock is a no-op used by -copylocks checker from `go vet`.
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
主要是讓noCopy字段實現Locke接口,因為noCopy是未輸出類型,因此不會調用到Lock或UnLock方法
分析Add方法
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter") //計數器不該 < 0
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state { //檢查是否還有waiter在等待,如果有不應該再併發調用Add
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
state := wg.state.Add(uint64(delta) << 32) 計數器加delta值
v := int32(state >> 32) 右移32位,只保留計數器的值
w := uint32(state) waiter的數量
if w != 0 && delta > 0 && v == int32(delta)
檢查是否還有waiter在等待,如果有不應該再併發調用Addwg.state.Store(0)
將計數器的值變為0,準備喚醒waiter
分析Wait
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
- 會先嘗試檢查計數器的值v
- v為0:不會發生阻塞,直接返回
- v不為0:原子操作state,把goroutine加入到waiter中
- 成功:被阻塞喚醒
- 失敗:進行循環檢查(因為可能同時有多個waiter調用Wait方法)
- 如果阻塞的Waiter被喚醒,理論上state的值為0(從Add方法實現可以看到,先清0再喚醒waiter),那就直接返回就好,因為state為0代表計數器的值也為0