之前写过关于互斥锁的内容,
读写锁相较于互斥锁的区别是,它不限制资源的并发读,但是读写、写写操作无法并行执行。
来看看读写锁的结构: /src/sync/rwmutex.go:28
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
比较容易发现
- 读写锁直接内置了一个互斥锁
writerSem
和readerSem
分别是读等待和写等待会用到的信号量readerCount
记录了当前正在执行的读操作的数量readerWait
写操作上锁时需要等待解读锁的读操作数量
1.18版本中,读写锁提供了这些外部方法
RLock
- 上读锁,阻塞式TryRLock
- 上读锁,非阻塞式RUnlock
- 解读锁Lock
- 上写锁,阻塞式TryLock
- 上写锁,非阻塞式Unlock
- 解写锁
先来看看写锁的操作吧:
上写锁/src/sync/rwmutex.go:133
:
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
...
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
①首先是直接调用自带的互斥锁的Lock方法,该行为会阻塞后续的写操作,当然,如果有其他g在之前就已经执行了该方法且还没释放,被阻塞的就是当前g了。
②接下来,调用atomic.AddInt32
方法将rw.readerCount
置为负数,这个操作会阻塞后续的读操作,同时,rw.readerCount
之前的数值保存在了临时变量r中。
③上写锁时,如果还有正在进行的读操作,需要等待它们完成,这就是第三步在做的,r≠0,这代表还有正在进行的读操作,atomic.AddInt32(&rw.readerWait, r) != 0
这一步操作和判断与读锁的慢解锁有关,在读锁解锁时,如果此时写锁已经执行了②,那么就会进入读锁的慢解锁,每一个读锁的慢解锁会rw.readerWait
对进行-1操作,直到当读锁慢解锁次数与r刚好相等时(也就是写锁上锁时正在进行读操作的g数量),rw.readerWait
为0,此时执行该慢解锁的g会负责唤醒在③中阻塞住的上写锁的g。
解写锁/src/sync/rwmutex.go:190
:
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
①第一步就是把之前减去的值给加回来,如果加完之后的值超过了最大值,代表发生了错误
②释放对应次数的信号量,唤醒在写锁持续期间阻塞的上读锁的g
③解开互斥锁
看起来一切都很简单,来看看读锁吧:
上读锁/src/sync/rwmutex.go:56
:
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
去除竞态相关代码后,其实真正核心的代码也就这一小部分,使用atomic.AddInt32使得readerCount+1,如果返回值是负数,代表此时有g获取了写锁,那么就调用runtime_SemacquireMutex陷入休眠等待唤醒。
解读锁/src/sync/rwmutex.go:56
:
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
对rw.readerCount
进行-1操作,如果-1后的值小于0,代表着此时有g正在上写锁,此时就需要进入慢解锁
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
// 未调用上读锁就调用了解读锁
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
慢解锁要做的事情就是对rw.readerWait
进行-1操作,如果-1后为0,代表当前g是最后一个在上写锁期间需要等待的解读锁,那么当前g需要唤醒睡眠中的上写锁的g。