特别提醒,本文所涉及的源码是go1.22.2 darwin/amd64
x/sync包的版本是v0.7.0
x
目录下的包一般认为是go
的官方拓展库
singleflight
定义:Package singleflight provides a duplicate function call suppression mechanism
意思就是提供了一个重复的函数调用抑制机制
一般有如下两种应用场景,能减少缓存击穿的风险。

现在来看看具体的源码
3个结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| type call struct { wg sync.WaitGroup
val interface{} err error
dups int chans []chan<- Result }
type Group struct { mu sync.Mutex m map[string]*call }
type Result struct { Val interface{} Err error Shared bool }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait()
if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false defer func() { if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() if g.m[key] == c { delete(g.m, key) } if e, ok := c.err.(*panicError); ok { if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { } else {
for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }()
func() { defer func() { if !normalReturn { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } }
|
DoChan
相比于Do
是它属于异步调用,返回一个channel
,解决同步调用时的阻塞问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock()
go g.doCall(c, key, fn)
return ch }
|
errorgroup
主要用于协调多个goroutine
的并发执行,并且能够可以返回错误,这是WaitGroup
所不能做到的事情
1 2 3 4 5 6 7 8 9 10
| type Group struct { cancel func(error) wg sync.WaitGroup
sem chan token errOnce sync.Once err error }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| func (g *Group) Go(f func() error) { if g.sem != nil { g.sem <- token{} } g.wg.Add(1) go func() { defer g.done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel(g.err) } }) } }() }
func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel(g.err) } return g.err }
|