之前写过关于互斥锁的内容,

Golang互斥锁-Mutex
日期: 2022-12-27   标签: #golang  #锁  #源码解析 
互斥锁结构 // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 sema uint32 } state表示当前互斥锁的状态, sema是用于控制锁状态的信号量 在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态: mutexLocked — 表示互斥锁的锁定状态; mutexWoken — 唤醒模式,此时释放锁的g不会唤醒休眠的g; mutexStarving — 当前的互斥锁进入饥饿状态; waitersCount — 当前互斥锁上等待的 Goroutine 个数; 正常模式和饥饿模式的区别: 在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。 ......
互斥锁是最基础的锁,你可以在任何需要锁的场景使用它,但是它不是万能的,在某些特定场景下,我们可以用其他的锁来获得更好的性能,例如读多写少的场景,就更适合读写锁,这也是这篇文章的主题。

读写锁相较于互斥锁的区别是,它不限制资源的并发读,但是读写、写写操作无法并行执行。

来看看读写锁的结构: /src/sync/rwmutex.go:28

type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // semaphore for writers to wait for completing readers
	readerSem   uint32 // semaphore for readers to wait for completing writers
	readerCount int32  // number of pending readers
	readerWait  int32  // number of departing readers
}

比较容易发现

  • 读写锁直接内置了一个互斥锁
  • writerSemreaderSem 分别是读等待和写等待会用到的信号量
  • readerCount记录了当前正在执行的读操作的数量
  • readerWait写操作上锁时需要等待解读锁的读操作数量

1.18版本中,读写锁提供了这些外部方法

  • RLock - 上读锁,阻塞式
  • TryRLock- 上读锁,非阻塞式
  • RUnlock - 解读锁
  • Lock - 上写锁,阻塞式
  • TryLock - 上写锁,非阻塞式
  • Unlock - 解写锁

先来看看写锁的操作吧:

上写锁/src/sync/rwmutex.go:133:

// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
	...
	// First, resolve competition with other writers.
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	...
}

①首先是直接调用自带的互斥锁的Lock方法,该行为会阻塞后续的写操作,当然,如果有其他g在之前就已经执行了该方法且还没释放,被阻塞的就是当前g了。

②接下来,调用atomic.AddInt32方法将rw.readerCount置为负数,这个操作会阻塞后续的读操作,同时,rw.readerCount之前的数值保存在了临时变量r中。

③上写锁时,如果还有正在进行的读操作,需要等待它们完成,这就是第三步在做的,r≠0,这代表还有正在进行的读操作,atomic.AddInt32(&rw.readerWait, r) != 0 这一步操作和判断与读锁的慢解锁有关,在读锁解锁时,如果此时写锁已经执行了②,那么就会进入读锁的慢解锁,每一个读锁的慢解锁会rw.readerWait对进行-1操作,直到当读锁慢解锁次数与r刚好相等时(也就是写锁上锁时正在进行读操作的g数量),rw.readerWait为0,此时执行该慢解锁的g会负责唤醒在③中阻塞住的上写锁的g。

解写锁/src/sync/rwmutex.go:190:

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
	...

	// Announce to readers there is no active writer.
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	rw.w.Unlock()
	...
}

①第一步就是把之前减去的值给加回来,如果加完之后的值超过了最大值,代表发生了错误

②释放对应次数的信号量,唤醒在写锁持续期间阻塞的上读锁的g

③解开互斥锁

看起来一切都很简单,来看看读锁吧:

上读锁/src/sync/rwmutex.go:56:

func (rw *RWMutex) RLock() {
	...
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	...
}

去除竞态相关代码后,其实真正核心的代码也就这一小部分,使用atomic.AddInt32使得readerCount+1,如果返回值是负数,代表此时有g获取了写锁,那么就调用runtime_SemacquireMutex陷入休眠等待唤醒。

解读锁/src/sync/rwmutex.go:56:

// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
	...
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	...
}

rw.readerCount进行-1操作,如果-1后的值小于0,代表着此时有g正在上写锁,此时就需要进入慢解锁

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		// 未调用上读锁就调用了解读锁
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

慢解锁要做的事情就是对rw.readerWait进行-1操作,如果-1后为0,代表当前g是最后一个在上写锁期间需要等待的解读锁,那么当前g需要唤醒睡眠中的上写锁的g。