特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64

文件位置:sync/mutex.go

文章所涉及的go汇编代码都是以_amd64结尾的文件

1 前期知识

1.1 阻塞锁

将当前协程阻塞挂起,直到锁被释放后,以回调的方式将阻塞协程重新唤醒,进行锁争夺

1.2 自旋锁

结合CAS,重复校验锁的状态并尝试获取锁,始终占用cpu

1.3 对比

优势 劣势
阻塞/唤醒 不占用cpu时间片 需要上下文切换
自旋 不会阻塞当前协程 占用cpu时间片

2 数据结构

1
2
3
4
type Mutex struct {
state int32
sema uint32
}
  • state:状态表示,第1位表示是否上锁,第2位表示mutex是否被唤醒,第3位表示mutex是否处于饥饿模式
  • sema:控制锁状态的信号量

3 上锁

1
2
3
4
5
6
7
8
9
10
11
func (m *Mutex) Lock() {
// 初次尝试,state为0则直接抢锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 快速抢锁失败,则进入loclSlow慢速抢锁
m.lockSlow()
}

主要上锁逻辑还是在lockSlow()函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 抢锁所等待的时间
starving := false // 是否处于饥饿模式
awoke := false // 是否已有协程在等待锁
iter := 0 // 自旋次数
old := m.state // 锁的state
for {
// 不处于饥饿模式,已经被上锁,并且满足自旋条件
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 将mutexWoken置为1,表明自己正在自旋,先不唤醒其他阻塞的协程(否则自旋无意义)
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋调用相关函数
runtime_doSpin()
// 自旋次数+1
iter++
old = m.state
// 继续自旋
continue
}

其中的CompareAndSwapInt32是一种原子操作,汇编实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 该文件在runtime/atomic_amd64.s
// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
// if(*val == old){
// *val = new;
// return 1;
// } else
// return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17
MOVQ ptr+0(FP), BX
MOVL old+8(FP), AX
MOVL new+12(FP), CX
LOCK
CMPXCHGL CX, 0(BX)
SETEQ ret+16(FP)
RET

判断的逻辑很简单,就是*valold相等,则将*val更新为new并且返回true,否则返回false

判断是否可以自旋的逻辑在runtime/proc.go文件中的sync_runtime_canSpin函数中

1
2
3
4
5
6
7
8
9
10
11
12
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
// active_spin=4
if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
return false
}
if p := getg().m.p.ptr(); !runqempty(p) {
return false
}
return true
}
  • 自旋次数iter大于等于4
  • cpu核数小于等于1

  • 当前P的执行队列中有等待执行的g

满足上述三种条件之一,则不满足自旋条件,不能进入自旋状态

runtime_doSpin的相关实现也在runtime/proc.go下的sync_runtime_doSpin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}
// procyield是用汇编实现的,文件在runtime/asm_amd64.s
// 其中AX的值是active_spin_cnt,并且active_spin_cnt=30
TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET

通过汇编代码可以看出,procyield函数会执行30次的PAUSE指令,该指令会占用CPU并消耗CPU时间。


现在回到lockSlow,继续看剩余的处理逻辑,此时不能再进行自旋操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 复制当前的state状态,用来设置mutex的新状态
new := old
// old state非饥饿模式
// 如果当前协程抢锁成功或者已经是上锁状态的,new state都应该加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// old state处于加锁状态或者饥饿状态,当前协程都会被阻塞,则等待数量+1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// old state已经上锁(表明当前协程在等待),并且处于饥饿状态,则将new state标记饥饿模式
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 当前协程被唤醒,不管是拿到锁了,还是将要进入阻塞队列了,都应该取消mutexWoken标记
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}

剩余代码逻辑通过CAS来尝试设置锁的状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// old state并没有上锁,也不是饥饿状态
if old&(mutexLocked|mutexStarving) == 0 {
// 当前协程上锁成功
break
}
// 如果已经等待过了,说明是从阻塞队列被唤醒的协程,因为此次上锁失败,放回队列头部
queueLifo := waitStartTime != 0
// 如果是新来抢锁的,则初始化等待时间,并且放到队列尾部
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 通过信号量来排队获取锁,通过sleep原语阻塞当前协程
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// sleep结束,被唤醒
// 如果当前协程等待时间超过1ms,则进入饥饿模式
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 获取锁目前状态(可能被其他协程修改过)
old = m.state
// 如果锁是饥饿模式,则锁应该是被释放的状态,当前协程是被信号量唤醒的,那么锁应该直接交给当前协程
if old&mutexStarving != 0 {
// 如果当前锁是上锁的或者有协程在唤醒状态,或者阻塞队列为空,这与上面条件冲突,是非法状态
// 因为饥饿模式下,锁应该是交给阻塞队列的头部的协程
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前协程获得锁,等待队列-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果只剩一个等待者了,则退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
// 更新mutex状态,主要是减去等待者或者退出饥饿模式,并加上上锁状态
atomic.AddInt32(&m.state, delta)
break
}
// 如果锁不是饥饿模式,唤醒当前协程,重置自旋次数
awoke = true
iter = 0
} else {
// CAS失败,重新尝试获取锁
old = m.state
}

现在来看看上面提到的runtime_SemacquireMutex函数,首先是调用runtime/sema.go/poll_runtime_Semacquire函数,接着它又调用semacquire1函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire)
}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) {
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// 抢占到信号量,则直接返回
if cansemacquire(addr) {
return
}
// 没抢到信号量:
// 等待者+1
// 继续尝试抢信号量,如果成功则返回
// 将自己入队
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semtable.rootFor(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
// 加锁
lockWithRank(&root.lock, lockRankRoot)
// 当前等待数量+1
root.nwait.Add(1)
// 尝试获取信号量
if cansemacquire(addr) {
root.nwait.Add(-1)
unlock(&root.lock)
break
}
// 入队
root.queue(addr, s, lifo)
// 当前协程等待,让调度起调用其他的协程
goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

runtime_SemacquireMutex函数的作用就是使用使用信号量保证资源不会被两个协程获取

4 解锁

1
2
3
4
5
6
7
8
9
10
11
12
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 快速解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 不等于0,代表快速解锁失败,开始慢速解锁
m.unlockSlow(new)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (m *Mutex) unlockSlow(new int32) {
// 已经被解锁,则直接抛出异常
if (new+mutexLocked)&mutexLocked == 0 {
fatal("sync: unlock of unlocked mutex")
}
// 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 没有阻塞的协程,直接返回
// 有阻塞的协程但是从处于mutexWoken或者已经被上锁或者处于饥饿模式,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒一个协程,阻塞数量减1,并将mutexWoken置为true
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式,直接将锁交给下一个等待者
runtime_Semrelease(&m.sema, true, 1)
}
}