在golang的并发编程中,为了保证访问数据的安全性,经常会用到mutex去保证goroutine的同步,本文基于golang 1.14.6版本去分析mutex实现, golang中的mutex是一种排它锁,其实现结构如下:
// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
state int32
sema uint32
}
官方文档中有介绍,mutex是一个排它锁,初始化时是未上锁的状态,并且要注意使用了mutex后不能对该mutex进行拷贝。
mutex结构中有两个字段,第一个是state字段,表示mutex的状态,标准库中有定义这么几种状态:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
mutexWaiterShift = iota // 用来维护等待队列长度
)
state的最低位表示该mutex是否释放
state的倒数第二位表示mutex是否唤醒
state的倒数第三位表示mutex是否处于饥饿状态
第二个是sema字段,sema用来提供休眠和唤醒goroutine的功能,相当于一个等待队列。
以下是标准库对mutex的注释简介:
// Mutex can be in 2 modes of operations: normal and starvation.
// In normal mode waiters are queued in FIFO order, but a woken up waiter
// does not own the mutex and competes with new arriving goroutines over
// the ownership. New arriving goroutines have an advantage -- they are
// already running on CPU and there can be lots of them, so a woken up
// waiter has good chances of losing. In such case it is queued at front
// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
// it switches mutex to the starvation mode.
//
// In starvation mode ownership of the mutex is directly handed off from
// the unlocking goroutine to the waiter at the front of the queue.
// New arriving goroutines don't try to acquire the mutex even if it appears
// to be unlocked, and don't try to spin. Instead they queue themselves at
// the tail of the wait queue.
//
// If a waiter receives ownership of the mutex and sees that either
// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
// it switches mutex back to normal operation mode.
//
// Normal mode has considerably better performance as a goroutine can acquire
// a mutex several times in a row even if there are blocked waiters.
// Starvation mode is important to prevent pathological cases of tail latency.
对应的中文含义如下:
mutex有两种状态:正常状态和饥饿状态
在正常状态下所有等待者(goroutine)按照FIFO的顺序排队,刚唤醒的goroutine不会
马上拥有锁,而是要和新申请锁的goroutine进行竞争,但是新竞争锁的goroutine会
更有优势,因为此刻它正占着CPU(这样的好处是不需要把正在运行的goroutine置为休眠
状态,而把休眠的goroutine置为运行状态,直接让运行的goroutine继续运行),刚唤醒
的goroutine竞争失败的情况下,会把该goroutine放到等待队列的最前面。在等待的
goroutine如果超过1秒没有获得锁,会把锁转为饥饿状态。
饥饿状态下当锁的所有权在一个goroutine解锁后会直接给等待队列的第一个goroutine
新申请的goroutine不会去获取锁,即时当前锁处于unlock状态,也不会自旋等待,而是
直接放到等待队列的最后面。
如果一个等待的goroutine拿到了锁,并且满足如下任意一个条件:
1. 它是队列中的最后一个
2. 它等待时间小于1ms
会将锁置为正常状态
正常状态有更好的性能,这里我觉得减少了goroutune状态的切换,饥饿状态也是很重要
的,避免等待队列尾部的goroutine长时间获取不到锁。
mutex有两个操作,Lock和UnLock,对应加锁,和解锁,内部实现如下:
Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}
如果锁处于unlock状态,会直接获取锁,如果不能获取锁,那么会调用 lockSlow函数:
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 记录此goroutine的等待时间
starving := false // 是否处于饥饿状态
awoke := false // 是否处于唤醒状态
iter := 0 // 自旋次数
old := m.state // 记录当前状态
for {
// 判断 old的状态,如果是饥饿状态就不会自旋, runtine_canSpin 会判断当前系统环境
// 是否支持自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 设置唤醒状态,因为此时已唤醒
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// 如果不是饥饿状态,新的goroutine设置锁,
// 如果是饥饿状态,不设置锁,因为直接会把goroutine放到等待队列
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 饥饿状态下 等待队列数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果当前 goroutine 处于 饥饿状态(等待时间超过1ms) 那么把锁也置为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
//
if awoke {
// 如果处于唤醒状态但是没有设置唤醒标记位,报错
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 新状态需要清除唤醒状态标记位,因为此goroutine要么获取锁,要么休眠
new &^= mutexWoken
}
// 通过 CAS 操作 更新 锁的状态
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果当前锁处于为 mutexLocked 和 mutexStarving 都为 0 ,说明该
// goroutine可以直接获取锁,此时直接返回
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 计算等待时间
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 未获取到锁
// queueLife = true ,放到等待队列头部
// queueLife = false , 放到等待队列尾部
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 计算 饥饿状态 的变量
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果锁处于饥饿状态
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果 此时不为饥饿状态或者 此goroutine为队列最后一个,退出饥饿状态
if !starving || old>>mutexWaiterShift == 1 {
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
// 如果锁处于正常状态,此goroutine获取锁,自旋次数置0
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
整体流程大概是,如果新的goroutine能拿到锁,即此时mutex处于未加锁状态,就进行加锁,并通过CAS操作将mutex置为加锁状态。
如果不能拿到锁,会一直做一个for循环,判断能否进行自旋等待,如果处于饥饿状态,直接将该goroutine放到等待队列尾部,如果处于正常状态,看锁释放是unlock状态,如果拿不到锁,通过sleep阻塞该goroutine。
UnLock
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
如果new不为0,说明有其他goroutine在等待,原因在Lock代码有介绍,新来的goroutine会置一个唤醒状态,此时state减1就不等于0了,这个时候需要在unlockSlow里面去唤醒等待者。
func (m *Mutex) unlockSlow(new int32) {
// 如果释放一个没有加锁状态的mutex,直接panic
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 如果锁处于正常状态
if new&mutexStarving == 0 {
old := new
for {
// 没有等待者或者锁不处于空闲状态,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒等待者,并且将等待者数量减1,并且设置标识
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
//处于饥饿状态时会把锁给队列中的第一个goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
UnLock的实现比较简单,分为两个过程:
如果锁处于正常状态,有两种情况,一种是没有新的goroutine获取锁,那么此时通过信号量去唤醒一个阻塞的goroutine获取锁,因为在正常状态下,等待的goroutine有可能会在自旋。runtime_Semrelease第二个参数如果为true表示直接唤醒第一个,如果为false则随机唤醒。 如果有新的goroutine获取锁,新的goroutine会直接获取锁,在CAS中会直接跳过,这是合理的。
如果是饥饿状态,那么直接通过runtime_Semrelease设置第二个参数为true直接唤醒等待队列中的第一个goroutine。
更多干货请关注微信号: 黑客的成长秘籍
因篇幅问题不能全部显示,请点此查看更多更全内容