源码级剖析Go标准库中的sync.RWMutex。

概述

RWMutex,读写锁,又称“读写互斥锁”。
读写锁简单来说就是可以由任意数量的读者同时使用,或者只由一个写者使用的锁。

读写锁和互斥量(Mutex)类似,但是比起互斥量有着更高的并行性,它允许多个读者同时读取,因此有一些特殊的应用场景。
在并发编程的很多场景下,数据的读取可能比写入更加频繁,这时就要允许多个线程同时读取一块内容。

用例

Go中,RWMutex的零值是一个未加锁的互斥量。

RWMutex使用起来相对比较简单,这里举一个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"fmt"
"sync"
"time"
)

func main() {
rw := new(sync.RWMutex)
for i := 0; i < 2; i++ { // 建立两个写者
go func() {
for j := 0; j < 3; j++ {
rw.Lock()
// 写
rw.Unlock()
}
}()
}
for i := 0; i < 5; i++ { // 建立两个读者
go func() {
for j := 0; j < 3; j++ {
rw.RLock()
// 读
rw.RUnlock()
}
}()
}
time.Sleep(time.Second)
fmt.Println("Done")
}

PlayGround

一个(神奇)优秀的(大坑)特性

读者在读的时候,不能够假定别的读者也能够获得锁。因此,禁止读锁嵌套。

是不是有点儿绕?下面举个“七秒例”:🌰

  • 第一秒:读者1在第1秒成功申请了读锁
  • 第二秒:写者1在第2秒申请写锁,申请失败,阻塞,但它会防止新的读者获锁
  • 第三秒:读者2在第3秒申请读锁,申请失败
  • 第四秒:读者1释放读锁,写者1获得写锁
  • 第五秒:写者1释放写锁,读者2获得读锁
  • 第六秒:读者1再次申请读锁,申请成功,与读者2共享
  • 第七秒:读者1、读者2释放读锁,结束

当写锁阻塞时,新的读锁是无法申请的,这可以有效防止写者饥饿。如果一个线程因为某种原因,导致得不到CPU运行时间,这种状态被称之为 **饥饿**。

然而,这种机制也禁止了读锁嵌套。读锁嵌套可能造成死锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"fmt"
"sync"
"time"
)

func main() {
rw := new(sync.RWMutex)

var deadLockCase time.Duration = 1

go func() {
time.Sleep(time.Second * deadLockCase)
fmt.Println("Writer Try")
rw.Lock()
fmt.Println("Writer Fetch")
time.Sleep(time.Second * 1)
fmt.Println("Writer Release")
rw.Unlock()
}()
fmt.Println("Reader 1 Try")
rw.RLock()
fmt.Println("Reader 1 Fetch")
time.Sleep(time.Second * 2)
fmt.Println("Reader 2 Try")
rw.RLock()
fmt.Println("Reader 2 Fetch")
time.Sleep(time.Second * 2)
fmt.Println("Reader 1 Release")
rw.RUnlock()
time.Sleep(time.Second * 1)
fmt.Println("Reader 2 Release")
rw.RUnlock()
time.Sleep(time.Second * 2)
fmt.Println("Done")
}

读者1和读者2是嵌套关系,按照这种时间安排,上述程序会导致死锁。

而有些死锁的可怕之处就在于,它不一定会发生。假设上面程序中的time.Sleep都是随机的时间,那么这一段代码每次的结果有可能不一致,这会给Debug带来极大的困难。

__吾闻读锁莫嵌套,写锁嵌套长已矣__。(读锁嵌套了还有概率成功,写锁嵌套了100%完蛋🏥)

源码剖析

(源码具体内容、行数,以版本go version 1.8.1为例。)

为了方便理解,可以把所有的if race.Enabled {...}扔掉不看。接下来,我们重述“七秒例”。🌰

第一秒,读者1请求读锁。

1
2
3
4
5
Line41: 
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_Semacquire(&rw.readerSem)
}

读者数量readerCount开始是0,这个时候加1,变成了1,不符合判负条件所以跳出,成功获得读锁一枚。

第二秒,写者尝试获取写锁。第85行获取w的锁。不管这个读写锁有没有获取成功,先排斥别的写者。

1
2
3
4
5
6
7
8
9
Line85:
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_Semacquire(&rw.writerSem)
}

刚才说了,一个写者阻塞在这里的时候,也不会让新的读者去读了,所以它干了一件非常坏的事情:
把readerCount变成了1-rwmutexMaxReaders。
这样就能卡住新来的读者了。
接下来,算出r等于1。这意味着有当前有写者存在。
因为有读者,所以写者卡在了信号量writerSem上。但是它不甘心啊,心想“等完现在的这几个读者,我就要去写!”,它默默地把现在占有读锁的人记在了小本本rw.readerWait上。在本例子中,readerWait被设置为了1。

第三秒,读者2尝试获得读锁,它又来到了第41行,结果发现读者的数量是1-rwmutexMaxReaders,好吧,它只好卡在信号量readerSem上。

第四秒,读者1调用RUnlock(),它首先把读者数量减一,毕竟自己已经不读了。

1
2
3
4
5
6
7
8
Line61:
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem)
}
}

在读者数量减一的时候,它发现读者数量是负数,这回读者1明白了,有一个写者在等待写。估计读者1自己已经在这个写者的小本本readerWait上了,因此它把readerWait减一,表示自己不读了。这时候读者1发现自己就是最后一个读者了,所以赶紧祭出writerSem,让写者可以去写。
读者1释放了writerSem信号量以后,写者很快就收到了这个提醒,兴高采烈地获得了写锁,开始自己的写作生涯。

读者2还卡着呢…

第五秒,写者1写完了一稿便不想写了,调用Unlock()准备释放读锁。

1
2
3
4
5
6
7
8
Line114:
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)

// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem)
}

只见他重新为readerCount加上rwmutexMaxReaders,使他重新变为了正数。这个正数恰好也是阻塞的读者的数量。
接下来,写者按照这个读者的数量,释放了这么多的readerSem信号量,相当于将所有阻塞的读者一一唤醒。读者2在收到readerSem的那一刻喜极而泣,它终于可以读了。

第六秒,读者1又来了,它把读者数量加1,发现它是正数哎,写者现在又没来,它再次幸运地瞬间获得读锁,与读者2一起读了起来。

第七秒,读者1和读者2都释放了自己的读锁。至此,结束。

名词解释

中文 英文 解释
信号量 (也称信号灯) Semaphore
条件变量 Condition
互斥量 Mutex

参考文献

  1. Wikipedia: Semaphore (programming)