特别提醒,本文所涉及的源码是go1.22.2 darwin/amd64
x/sync包的版本是v0.7.0

x目录下的包一般认为是go的官方拓展库

singleflight

定义:Package singleflight provides a duplicate function call suppression mechanism

意思就是提供了一个重复的函数调用抑制机制

一般有如下两种应用场景,能减少缓存击穿的风险。

image-20240418173405122

现在来看看具体的源码

3个结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
type call struct {
wg sync.WaitGroup

// 存放fn函数调用的返回值
val interface{}
err error

// 合并的请求数
dups int
// 调用DoChan的返回结果
chans []chan<- Result
}

type Group struct {
mu sync.Mutex // 互斥访问m
m map[string]*call // 懒加载
}
// 用于DoChan调用
type Result struct {
Val interface{}
Err error
Shared bool
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 对于同样的key只会执行一次fn函数调用
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
// 加锁
g.mu.Lock()
// 初始化
if g.m == nil {
g.m = make(map[string]*call)
}
// 发现已经被调用
if c, ok := g.m[key]; ok {
// 参数更新,等待
c.dups++
g.mu.Unlock()
c.wg.Wait()

if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
// 第一次调用,则新建,添加进map
c := new(call)
c.wg.Add(1)
g.m[key] = c
// 解释
g.mu.Unlock()
// 执行fn的调用
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
// 使用双延迟机制来区分panic和runtime.Goexit
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
// 删除key
if g.m[key] == c {
delete(g.m, key)
}
// 如果是panic,往外抛
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {}
} else {
panic(e)
}
} else if c.err == errGoexit {
// goexit则直接退出
} else {

for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()

func() {
defer func() {
if !normalReturn {
c.err = newPanicError(r)
}
}
}()
// 调用fn函数
c.val, c.err = fn()
normalReturn = true
}()
// 捕获错误
if !normalReturn {
recovered = true
}
}

DoChan相比于Do是它属于异步调用,返回一个channel,解决同步调用时的阻塞问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

go g.doCall(c, key, fn)

return ch
}

errorgroup

主要用于协调多个goroutine的并发执行,并且能够可以返回错误,这是WaitGroup所不能做到的事情

1
2
3
4
5
6
7
8
9
10
type Group struct {
// context的cancel方法
cancel func(error)
wg sync.WaitGroup

sem chan token
// 保证只处理一次错误
errOnce sync.Once
err error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 调用这个并发执行
func (g *Group) Go(f func() error) {
if g.sem != nil {
g.sem <- token{}
}
g.wg.Add(1)
go func() {
defer g.done()
// 执行业务逻辑f()
if err := f(); err != nil {
// 如果发生错误,则调用这个只执行一次
g.errOnce.Do(func() {
g.err = err
// 如果有cancel函数,则调用cancel
if g.cancel != nil {
g.cancel(g.err)
}
})
}
}()
}
// 等待所有g完成
func (g *Group) Wait() error {
// 等待所有g完成
g.wg.Wait()
if g.cancel != nil {
g.cancel(g.err)
}
// 返回是否有错误
return g.err
}