特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64
文件位置:sync/mutex.go
文章所涉及的go汇编代码都是以_amd64结尾的文件
1 前期知识
1.1 阻塞锁
将当前协程阻塞挂起,直到锁被释放后,以回调的方式将阻塞协程重新唤醒,进行锁争夺
1.2 自旋锁
结合CAS
,重复校验锁的状态并尝试获取锁,始终占用cpu
1.3 对比
|
优势 |
劣势 |
阻塞/唤醒 |
不占用cpu时间片 |
需要上下文切换 |
自旋 |
不会阻塞当前协程 |
占用cpu时间片 |
2 数据结构
1 2 3 4
| type Mutex struct { state int32 sema uint32 }
|
- state:状态表示,第1位表示是否上锁,第2位表示
mutex
是否被唤醒,第3位表示mutex
是否处于饥饿模式
- sema:控制锁状态的信号量
3 上锁
1 2 3 4 5 6 7 8 9 10 11
| func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } m.lockSlow() }
|
主要上锁逻辑还是在lockSlow()
函数中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue }
|
其中的CompareAndSwapInt32
是一种原子操作,汇编实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
TEXT ·Cas(SB),NOSPLIT,$0-17 MOVQ ptr+0(FP), BX MOVL old+8(FP), AX MOVL new+12(FP), CX LOCK CMPXCHGL CX, 0(BX) SETEQ ret+16(FP) RET
|
判断的逻辑很简单,就是*val
跟old
相等,则将*val
更新为new
并且返回true
,否则返回false
判断是否可以自旋的逻辑在runtime/proc.go
文件中的sync_runtime_canSpin
函数中
1 2 3 4 5 6 7 8 9 10 11 12
|
func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
|
- 自旋次数
iter
大于等于4
cpu
核数小于等于1
当前P
的执行队列中有等待执行的g
满足上述三种条件之一,则不满足自旋条件,不能进入自旋状态
runtime_doSpin
的相关实现也在runtime/proc.go
下的sync_runtime_doSpin
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
func sync_runtime_doSpin() { procyield(active_spin_cnt) }
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
|
通过汇编代码可以看出,procyield
函数会执行30次的PAUSE
指令,该指令会占用CPU
并消耗CPU
时间。
现在回到lockSlow
,继续看剩余的处理逻辑,此时不能再进行自旋操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| new := old
if old&mutexStarving == 0 { new |= mutexLocked }
if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift }
if starving && old&mutexLocked != 0 { new |= mutexStarving }
if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
|
剩余代码逻辑通过CAS
来尝试设置锁的状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break } queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state }
|
现在来看看上面提到的runtime_SemacquireMutex
函数,首先是调用runtime/sema.go/poll_runtime_Semacquire
函数,接着它又调用semacquire1
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| func poll_runtime_Semacquire(addr *uint32) { semacquire1(addr, false, semaBlockProfile, 0, waitReasonSemacquire) }
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason) { gp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") } if cansemacquire(addr) { return } s := acquireSudog() root := semtable.rootFor(addr) t0 := int64(0) s.releasetime = 0 s.acquiretime = 0 s.ticket = 0 if profile&semaBlockProfile != 0 && blockprofilerate > 0 { t0 = cputicks() s.releasetime = -1 } if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { if t0 == 0 { t0 = cputicks() } s.acquiretime = t0 } for { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } } if s.releasetime > 0 { blockevent(s.releasetime-t0, 3+skipframes) } releaseSudog(s) }
|
runtime_SemacquireMutex
函数的作用就是使用使用信号量保证资源不会被两个协程获取
4 解锁
1 2 3 4 5 6 7 8 9 10 11 12
| func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 1) } }
|