特别提醒,本文所涉及的源码是go1.21.0 darwin/amd64
文件位置:runtime/slice.go

1 数据结构

1
2
3
4
5
6
7
8
type RWMutex struct {
w Mutex // 写锁
writerSem uint32 // 写操作等待读操作完成
readerSem uint32 // 读操作等待写操作完
readerCount atomic.Int32
readerWait atomic.Int32
}
const rwmutexMaxReaders = 1 << 30
  • readerCount:在没有写锁介入的时候,表示的是当前正在读操作的数量。如果有写介入,则readerCount+rwmutexMaxReaders等于正在读操作的数量
  • readerWait:在能够获取写锁前,还需要等待多少个读协程释放读锁

2 写锁

1
2
3
4
5
6
7
8
9
10
11
12
func (rw *RWMutex) Lock() {
// 首先与其他写锁操作争Mutex
rw.w.Lock()
// 与其他写操作竞争成功
// const rwmutexMaxReaders = 1 << 30
// readerCount减去rwmutexMaxReaders,阻塞后面的读操作,r表示当前正在进行读操作的数量
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
// 前面还有读操作,readerWait+=r,并阻塞挂起,等待唤醒
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}

为什么获取写锁的时候要先减去rwmutexMaxReaders,阻塞后续的读操作呢?

主要是防止后续不断的读操作导致写操作被「饿死」

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (rw *RWMutex) Unlock() {
// 释放写锁
r := rw.readerCount.Add(rwmutexMaxReaders)
// 如果之前就没加过写锁,直接报错
if r >= rwmutexMaxReaders {
race.Enable()
fatal("sync: Unlock of unlocked RWMutex")
}
// 唤醒readerCount个读操作
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放写锁
rw.w.Unlock()
}

3 读锁

1
2
3
4
5
6
7
func (rw *RWMutex) RLock() {
// readerCount+1,如果小于0,说明有写锁,因为写锁会减去rwmutexMaxReaders
if rw.readerCount.Add(1) < 0 {
// 阻塞挂起,等待写锁的释放
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}
1
2
3
4
5
6
7
func (rw *RWMutex) RUnlock() {
// readerCount-1,如果大于等于0直接释放成功
if r := rw.readerCount.Add(-1); r < 0 {
// 如果小于0,说明前面有协程在等待获取写锁,则会进入rUnlockSlow方法中
rw.rUnlockSlow(r)
}
}
1
2
3
4
5
6
7
8
9
10
11
func (rw *RWMutex) rUnlockSlow(r int32) {
// 之前没有加过读锁,因此释放读锁会直接报错
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
fatal("sync: RUnlock of unlocked RWMutex")
}
// readerWait-1等于0,说明该读操作是最后一个读操作,释放完它后,应该唤醒写操作
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}

4 Try

rwmutex.go文件中除了上面提到的函数,还有两个Try函数,该函数只在特定场景下使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
// 如果有写操作正在执行,则返回false
if c < 0 {
return false
}
// 双重校验,再检查一下是否中间的过程中,有其他协程介入,没有则返回true
// 否则继续循环
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
func (rw *RWMutex) TryLock() bool {
// 尝试获取写锁失败,则返回false
if !rw.w.TryLock() {
return false
}
// 判断是否有读操作在进行,有则返回false,并释放写锁
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
// 释放写锁
rw.w.Unlock()
return false
}
return true
}