golang源码阅读-sync.Mutex

【golang源码分析】sync.Mutex

概述

Mutex只有两个阻塞方法Lock和Unlock,有两种工作模式:normal和starvation。

goroutine在抢占锁时,会先自旋一段时间,如果抢占失败会被放在一个FIFO等待队列中休眠,当锁被释放时,会优先唤醒队首的goroutine。

在normal模式中,被唤醒的waiter会跟新到的goroutine竞争锁,一般来说新的goroutine已经在cpu中进行了,并且可能有不止一个goroutine,在这种情况下刚被唤醒的goroutine有很大概率会失败,而被重新放回FIFO队首。如果当一个waiter等待获取锁的时间超过1ms,会将mutex转换成starvation模式。

在starvation模式,当mutex被unlock时,持有权会直接移交到队首的goroutine中。新加入的goroutine不会再参与锁的竞争,而是直接放入等待队列的尾部。

mutex通过一个state变量来维护锁的状态信息。

  • 低一位mutexLocked:表示锁是否被锁定
  • 低二位mutexWoken:表示是否有goroutine在竞争锁,这个主要应用于unlock时判断是否有必要唤醒等待队列中的goroutine
  • 低三位mutexStarving:表示锁的模式starvation或normal,剩余高位用于标志等待队列的长度。

代码分析

定义

Mutex实现了Locker接口,维护两个变量——state用来维护锁的状态,同时通过操作sema来唤醒和休眠等待goroutine

type Mutex struct {
    state int32
    sema  uint32
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota

    starvationThresholdNs = 1e6
)

Lock

完整代码

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }

    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

详解

  1. 首先默认state为0,通过cas操作将锁的状态更新为mutexLocked,尝试快速获取锁。在并发度较小的情况下,mutex多数处于初始状态,可以提供加锁性能。如果快速获取锁失败,goroutine会不断地进行自旋抢锁、休眠、唤醒抢锁,直到抢到锁后break。在这期间,需要通过cas对state进行更新,只有更新成功的goroutine才能进行下一阶段的操作。
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
  1. 竞争锁的goroutine会先自旋一段时间直到mutex被unlocked、或切换到starvation模式、或自旋次数达到上限。自旋过程中,goroutine也会尝试通过cas将mutexWoken位置1,以通知Unlock操作不必唤醒等待队列中的goroutine。starvatione模式是不需要自旋的,因为锁的持有权会直接移交给等待队列队首的goroutine,自旋操作没有实际意义。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // 当前goroutine未更新成功过woken位,woken位为0,等待队列非空,通过cas尝试更新woken位
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()  // pause一段时间
        iter++
        old = m.state
        continue
}
  1. 从前面代码可以看到,退出自旋有三种原因,及其对应的处理如下:
    • mutex被Unlock:将mutexLocked置1,代表尝试抢锁,然后cas尝试更新state
    • mutex切换到starvation模式:不改变mutexLocked,waiter数量+1,cas更新state成功后后直接加入到等待队列里。
    • 自旋次数达到上限:不改变mutexLocked,waiter数量+1,如果starving变量为true,则将mutexStarving位置1。

starving是goroutine被唤醒时通过计算等待时间获取的,大于1ms则为true,表示需要切换到starvation模式。

这里需要注意,当mutex为unlocked的时候,不需要转换mutex模式。因为starving为true,说明该goroutine是normal模式下等待队列中刚被唤醒的waiter(原因需要看后面的代码,在starvation模式。会直接将mutex的持有权交给唤醒的goroutine,走不到这一步)。如果此时做了状态切换,可能会出现在starvation模式,等待队列为空的情况。因此, 只有自旋抢锁失败的情况下,才会由等待时间超过阈值的goroutine将mutex模式切换到starvation

new := old
// 锁被unlocked,将mutexLocked置1,代表尝试抢锁
if old&mutexStarving == 0 {
        new |= mutexLocked
}
// starving或自旋次数到上限, 等待数量+1
if old&(mutexLocked|mutexStarving) != 0 {
        new += 1 << mutexWaiterShift
}
// 如果已经是starvation状态,这一步没什么意义
if starving && old&mutexLocked != 0 {
        new |= mutexStarving
}
if awoke {
        if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
        }
        // 把awoke位给清掉
        new &^= mutexWoken
}
  1. cas 更新mutex状态,失败则重复上述操作,直到状态更新成功。
    • 如果是获取锁的动作,则直接break。
    • 否则将goroutine放到等待队列中,若waitStartTime不为0,则说明是waiter抢锁失败,应该重新返回队首。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 获取锁成功,break。接下来要处理starving和自旋次数达到上限的情况
        if old&(mutexLocked|mutexStarving) == 0 {
            break // locked the mutex with CAS
        }
       // waitStartTime说明是被唤醒的goroutine抢锁失败了,应该重新放回队首(queueLifo=true)
        queueLifo := waitStartTime != 0
        if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
        }
        // 休眠
        runtime_SemacquireMutex(&m.sema, queueLifo)
  1. 被唤醒后,如果当前starving为true,或等待时间大于阈值,则将starving置为true,下一次自旋抢锁失败会将mutex切换到starvation模式。在starvation模式,该goroutine直接获取锁,否则的话要重复前面的操作跟新来的goroutine抢锁
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state

if old&mutexStarving != 0 {
        // starving模式不应该出现mutex被locked或woken,等待队列长度也不能为0,有的话说明这段代码逻辑有bug
        if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
        }
        // 获取锁并将等待数-1
        delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 等待时间小于阈值,或该goroutine为等待队列中最后一个waiter,切换到normal模式
        if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
        }
        // 把状态更新过去,获取锁并退出。这里不用cas,因为前面操作保证了低三位都不会被更新,高位本来是原子计数,所以直接add就可以
        atomic.AddInt32(&m.state, delta)
        break
}
awoke = true
iter = 0

Unlock

// mutex允许一个goroutine加锁,另一个goroutine解锁,不要求两个操作保证同一个goroutine
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 直接解锁,然后判断一下状态,如果不一致的话则抛异常
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }

  // starvation模式,则直接唤醒blocked队列队首goroutine
  // normal模式,如果等待队列为空,或此时锁的状态发生了变化,则不用唤醒等待队列中的goroutine
  // 否则的话,将等待数减1,cas更新state,成功则唤醒一个waiter
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
     // starving模式,直接将持有权交给下一个waiter。mutexLocked是在队首waiter被唤醒后更新的,所以此时mutexLocked还是0,但是在starving       
     // 模式下,新来的goroutine不会更新mutexLocked位。配合Lock代码看。
        runtime_Semrelease(&m.sema, true)
    }
}

\

practice/golang/源码分析

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章