互斥锁结构


// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}

 state表示当前互斥锁的状态, sema是用于控制锁状态的信号量

state

在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

  • mutexLocked — 表示互斥锁的锁定状态;

  • mutexWoken — 唤醒模式,此时释放锁的g不会唤醒休眠的g;

  • mutexStarving — 当前的互斥锁进入饥饿状态;

  • waitersCount — 当前互斥锁上等待的 Goroutine 个数;

  • 正常模式和饥饿模式的区别:

    在正常模式下,锁的等待者会按照先进先出的顺序获取锁。但是刚被唤起的 Goroutine 与新创建的 Goroutine 竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。

    在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

    与饥饿模式相比,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。

加锁和解锁


加锁

/src/sync/mutex.go:72

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	// 尝试使用CAS操作将state置为mutexLocked状态
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		// 竞态代码可忽略
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	// 竞争🔐
	m.lockSlow()
}

lockSlow()方法,/src/sync/mutex.go:108:

该方法的主体是一个非常大 for 循环,这里将它分成几个部分介绍获取锁的过程:

  1. 判断当前 Goroutine 能否进入自旋;
  2. 通过自旋等待互斥锁的释放;
  3. 计算互斥锁的最新状态;
  4. 更新互斥锁的状态并获取锁;
func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}

		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 更新old
			old = m.state
			if old&mutexStarving != 0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

先看这一段:

// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
	// Active spinning makes sense.
	// Try to set mutexWoken flag to inform Unlock
	// to not wake other blocked goroutines.
	// 非唤醒状态 && 唤醒位不为1 && old右移三位!=0,即等待的g数不为0 && CAS置唤醒位为1
	if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
		atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
		awoke = true
	}
	runtime_doSpin()
	iter++
	old = m.state
	continue
}

old&(mutexLocked|mutexStarving) == mutexLocked 只有在old为1即mutexLocked时为true,也就是此时锁必须是非饥饿模式且已上锁,runtime_canSpin的具体实现在/src/runtime/proc.go:6088

const (
	active_spin     = 4
)
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

runtime_canSpin返回true的条件有这些:

  • 当前Goroutine尝试次数不得大于4(active_spin)
  • 当前运行机器是多核CPU
  • GOMAXPROCS>1
  • 至少有一个其他正在运行的 P 并且本地 runq 为空。

满足条件的话,会执行runtime_doSpin(/src/runtime/proc.go:6105),procyield(/src/runtime/asm_amd64.s:729),会执行30次的PAUSE指令。

处理了自旋逻辑后,接下来,根据上下文计算出互斥锁的状态:

new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果是饥饿状态,不再尝试抢锁,此时只有那些在阻塞中的g能够拿锁
if old&mutexStarving == 0 {
	// 如果不是饥饿状态,当前g抢到🔐的话,state应该是上锁状态
	new |= mutexLocked
}
// 如果old处于饥饿或者上锁状态,即不为正常解锁状态,则当前g这次循环是拿不到锁的,将等待数+1
if old&(mutexLocked|mutexStarving) != 0 {
	// mutexLocked位或者mutexStarving不为0,等待锁的g数量+1
	new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
	// 上一次循环设置了该g的饥饿标志,并且锁仍未解开,则设置饥饿模式
	new |= mutexStarving
}

if awoke {
	// The goroutine has been woken from sleep,
	// so we need to reset the flag in either case.
	if new&mutexWoken == 0 {
		throw("sync: inconsistent mutex state")
	}
	new &^= mutexWoken
}

尝试将算出的状态进行赋值,赋值成功并不代表拿锁成功,还需要old不在mutexLockedmutexStarving两种状态中,拿锁逻辑:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
	// CAS成功的当前g会将state设置为了lock状态,如果old为正常解锁状态,意味着抢到了锁
	if old&(mutexLocked|mutexStarving) == 0 {
		break // locked the mutex with CAS
	}
	// If we were already waiting before, queue at the front of the queue.
	// 如果waitStartTime不为0,表示之前已经尝试过取锁了
	queueLifo := waitStartTime != 0
	if waitStartTime == 0 {
		waitStartTime = runtime_nanotime()
	}
	runtime_SemacquireMutex(&m.sema, queueLifo, 1)
	starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
	old = m.state
	// 判断是否处于饥饿模式
	if old&mutexStarving != 0 {
		// If this goroutine was woken and mutex is in starvation mode,
		// ownership was handed off to us but mutex is in somewhat
		// inconsistent state: mutexLocked is not set and we are still
		// accounted as waiter. Fix that.
		if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
			throw("sync: inconsistent mutex state")
		}
		
		delta := int32(mutexLocked - 1<<mutexWaiterShift)
		// 解除饥饿状态的情况有两种
		// 1. starving为false,即当前g的等待时间小于1ms
		// 2. 目前只有当前g在等🔐
		if !starving || old>>mutexWaiterShift == 1 {
			// Exit starvation mode.
			// Critical to do it here and consider wait time.
			// Starvation mode is so inefficient, that two goroutines
			// can go lock-step infinitely once they switch mutex
			// to starvation mode.
			delta -= mutexStarving
		}
		atomic.AddInt32(&m.state, delta)
		break
	}
	// 非饥饿模式
	awoke = true
	iter = 0
} else {
	// 未拿到🔐,重新循环
	old = m.state
}

g在唤醒后,会立马检查是否处于饥饿模式,如果不在饥饿模式,设置唤醒,重置iter,进入下次循环。如果当前饥饿状态,会直接拿到锁,根据条件判断是否需要解除饥饿状态。

流程图:

流程图

总结一下其中的关键点:

自旋:自旋能够让当前g占住cpu,而不会被抢占,但是抢占是有条件的。解释一下设立这些条件的原因:

  1. 机器cpu核心数>1,如果没有多核cpu,当前g在自旋的过程中,持有锁的g也无法运行,也就不会将锁释放,当前g的自旋是完全无用的。
  2. 当前锁在上锁状态,且不能是饥饿状态,因为饥饿状态下,所有新来的g都必须排队,只有那些正在阻塞的g,在被唤醒时有机会取锁。

new和old: new是old的拷贝,最终要对锁的state进行CAS覆盖。new的值要根据old的值计算,成功将new覆盖state的g不一定是成功拿锁的,但是没设置成功的,只能进入下次的循环。

解锁

/src/sync/mutex.go:203

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	// 注意这里已经将mutexLocked位置0了
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

state减去1后,如果不为0,则快速解锁失败,进入unlockSlow方法。new为0的唯一条件是state为1,即仅有state的锁位为1,此时m.state==mutexLocked

慢解锁路径

/src/sync/mutex.go:218

func (m *Mutex) unlockSlow(new int32) {
	// 状态检查,如果new是0的话,不应该进入慢解锁
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
	
	if new&mutexStarving == 0 {
		// 如果不在饥饿状态,此时就是正常的竞争,进入慢速解锁可能是因为存在等待的g
		old := new
		for {
			// If there are no waiters or a goroutine has already
			// been woken or grabbed the lock, no need to wake anyone.
			// In starvation mode ownership is directly handed off from unlocking
			// goroutine to the next waiter. We are not part of this chain,
			// since we did not observe mutexStarving when we unlocked the mutex above.
			// So get off the way.
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				// 如果不存在等待者,或者锁已经被拿了、或者中途进入了饥饿模式。
				return
			}
			// Grab the right to wake someone.
			// 等待人-1,唤醒一个等待的g
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		// Starving mode: handoff mutex ownership to the next waiter, and yield
		// our time slice so that the next waiter can start to run immediately.
		// Note: mutexLocked is not set, the waiter will set it after wakeup.
		// But mutex is still considered locked if mutexStarving is set,
		// so new coming goroutines won't acquire it.
		// 饥饿模式,将🔐的所有权移交给下一个等待者,并让出当前g的时间片从而让下一个等待者能立马开始执行。
		// 这时并未退出饥饿状态
		// 拿到🔐的等待者,将负责设置mutexLocked
		// 注意,相比上个分支,这里没有做等待人-1的操作,这个操作会由等待者执行。
		runtime_Semrelease(&m.sema, true, 1)
	}
}

mutex相关的还有一个方法-TryLock(),用于非阻塞式的取锁,相较于Lock(),其中的逻辑非常简单,就不做更多介绍。