sync.mutex 源代码分析

sync.Mutex 是Go标准库中常用的一个排外锁。当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放。

sync.Mutex 的实现也是经过多次的演化,功能逐步加强,增加了公平的处理和饥饿机制。

初版的 Mutex

首先我们来看看Russ Cox在2008提交的第一版的 Mutex 实现。

type Mutex struct {
    key int32;
    sema int32;
}
    
func xadd(val *int32, delta int32) (new int32) {
    for {
        v := *val;
        if cas(val, v, v+delta) {
            return v+delta;
        }
    }
    panic("unreached")
}
        
func (m *Mutex) Lock() {
        if xadd(&m.key,1) ==1 {
            // changed from 0 to 1; we hold lock
            return;
        }
        sys.semacquire(&m.sema);
    }
    
    func (m *Mutex) Unlock() {
        if xadd(&m.key,-1) ==0 {
            // changed from 1 to 0; no contention
            return;
        }
        sys.semrelease(&m.sema);
    }

可以看到,这简单几行就可以实现一个排外锁。通过 caskey 进行加一, 如果 key 的值是从 0 加到 1 , 则直接获得了锁。否则通过 semacquire 进行sleep, 被唤醒的时候就获得了锁。

2012年, commit dd2074c8 做了一次大的改动,它将 waiter count (等待者的数量)和 锁标识 分开来(内部实现还是合用使用 state 字段)。新来的 goroutine 占优势,会有更大的机会获取锁。

获取锁, 指当前的gotoutine拥有锁的所有权,其它goroutine只有等待。

2015年, commit edcad863 , Go 1.5中 mutex 实现为全协作式的,增加了spin机制,一旦有竞争,当前goroutine就会进入调度器。在临界区执行很短的情况下可能不是最好的解决方案。

2016年 commit 0556e262 , Go 1.9中增加了饥饿模式,让锁变得更公平,不公平的等待时间限制在1毫秒,并且修复了一个大bug,唤醒的goroutine总是放在等待队列的尾部会导致更加不公平的等待时间。

目前这个版本的 mutex 实现是相当的复杂, 如果你粗略的瞄一眼,很难理解其中的逻辑, 尤其实现中字段的共用,标识的位操作,sync函数的调用、正常模式和饥饿模式的改变等。

本文尝试解析当前 sync.Mutex 的实现,梳理一下 LockUnlock 的实现。

源代码分析

根据 Mutex 的注释,当前的 Mutex 有如下的性质。这些注释将极大的帮助我们理解 Mutex 的实现。

互斥锁有两种状态:正常状态和饥饿状态。

在正常状态下,所有等待锁的goroutine按照 FIFO 顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式。

在饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。

如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。

正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。

在分析源代码之前, 我们要从多线程(goroutine)的并发场景去理解为什么实现中有很多的分支。

当一个goroutine获取这个锁的时候, 有可能这个锁根本没有竞争者, 那么这个goroutine轻轻松松获取了这个锁。

而如果这个锁已经被别的goroutine拥有, 就需要考虑怎么处理当前的期望获取锁的goroutine。

同时, 当并发goroutine很多的时候,有可能会有多个竞争者, 而且还会有通过信号量唤醒的等待者。

sync.Mutex 只包含两个字段:

type Mutex struct {
    state int32
    sema  uint32
}

state 是一个共用的字段, 第0个 bit 标记这个 mutex 是否已被某个goroutine所拥有, 下面为了描述方便称之为 state 已加锁,或者 mutex 已加锁。 如果 第0个 bit为0, 下文称之为 state 未被锁, 此mutex目前没有被某个goroutine所拥有。

第1个bit 标记这个 mutex 是否已唤醒, 也就是有某个唤醒的 goroutine 要尝试获取锁。

第2个bit 标记这个 mutex 状态, 值为1表明此锁已处于饥饿状态。

同时,尝试获取锁的goroutine也有状态,有可能它是新来的goroutine,也有可能是被唤醒的goroutine, 可能是处于正常状态的goroutine, 也有可能是处于饥饿状态的goroutine。

Lock

func (m *Mutex) Lock() {
    // 如果mutext的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
    // 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。
    if atomic.CompareAndSwapInt32(&m.state,0, mutexLocked) {
        return
    }

    // 标记本goroutine的等待时间
    var waitStartTime int64
    // 本goroutine是否已经处于饥饿状态
    starving := false
    // 本goroutine是否已唤醒
    awoke := false
    
    // 自旋次数
    iter :=0
    
    // 复制锁的当前状态
    old := m.state
    
    for {
        // 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
        // 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。
        // 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
            if !awoke && old&mutexWoken ==0 && old>>mutexWaiterShift !=0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        
        // 到了这一步, state的状态可能是:
        // 1. 锁还没有被释放,锁处于正常状态
        // 2. 锁还没有被释放, 锁处于饥饿状态
        // 3. 锁已经被释放, 锁处于正常状态
        // 4. 锁已经被释放, 锁处于饥饿状态
        //
        // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)


        // new 复制 state的当前状态, 用来设置新的状态
        // old 是锁当前的状态
        new := old
        
        // 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
        // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
        if old&mutexStarving ==0 {
            new |= mutexLocked
        }

        // 将等待队列的等待者的数量加1
        if old&(mutexLocked|mutexStarving) !=0 {
            new +=1 << mutexWaiterShift
        }
        
        // 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
        // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
        if starving && old&mutexLocked !=0 {
            new |= mutexStarving
        }
        
        // 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
        // 总之state的新状态不再是woken状态.
        if awoke {
            if new&mutexWoken ==0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        
        // 通过CAS设置new state值.
        // 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
            // 那么当前goroutine已经获取了锁的拥有权,返回
            if old&(mutexLocked|mutexStarving) ==0 {
                break 
            }
            
            // 设置/计算本goroutine的等待时间
            queueLifo := waitStartTime !=0
            if waitStartTime ==0 {
                waitStartTime = runtime_nanotime()
            }
            
            // 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
            // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
            // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
            runtime_SemacquireMutex(&m.sema, queueLifo)
            
            // sleep之后,此goroutine被唤醒

            // 计算当前goroutine是否已经处于饥饿状态.
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

            // 得到当前的锁状态
            old = m.state
            
            // 如果当前的state已经是饥饿状态
            // 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine
            if old&mutexStarving !=0 { 
                
                // 如果当前的state已被锁,或者已标记为唤醒, 或者等待的队列中不为空,
                // 那么state是一个非法状态
                if old&(mutexLocked|mutexWoken) !=0 || old>>mutexWaiterShift ==0 {
                    throw("sync: inconsistent mutex state")
                }
                
                // 当前goroutine用来设置锁,并将等待的goroutine数减1.
                delta := int32(mutexLocked -1<<mutexWaiterShift)
                
                // 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态,
                // 那么我们需要把锁的state状态设置为正常模式.
                if !starving || old>>mutexWaiterShift ==1 {
                    // 退出饥饿模式
                    delta -= mutexStarving
                }
                
                // 设置新state, 因为已经获得了锁,退出、返回
                atomic.AddInt32(&m.state, delta)
                break
            }
            
            // 如果当前的锁是正常模式,本goroutine被唤醒,自旋次数清零,从for循环开始处重新开始
            awoke = true
            iter =0
        } else { // 如果CAS不成功,重新获取锁的state, 从for循环开始处重新开始
            old = m.state
        }
    }
}

Unlock

func (m *Mutex) Unlock() {
    // 如果state不是处于锁的状态, 那么就是Unlock根本没有加锁的mutex, panic
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked ==0 {
        throw("sync: unlock of unlocked mutex")
    }
    
    // 释放了锁,还得需要通知其它等待者
    // 锁如果处于饥饿状态,直接交给等待队列的第一个, 唤醒它,让它去获取锁
    // 锁如果处于正常状态,

    // new state如果是正常状态
    if new&mutexStarving ==0 {
        old := new
        for {
            // 如果没有等待的goroutine, 或者锁不处于空闲的状态,直接返回.
            if old>>mutexWaiterShift ==0 || old&(mutexLocked|mutexWoken|mutexStarving) !=0 {
                return
            }

            // 将等待的goroutine数减一,并设置woken标识
            new = (old -1<<mutexWaiterShift) | mutexWoken

            // 设置新的state, 这里通过信号量会唤醒一个阻塞的goroutine去获取锁.
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 饥饿模式下, 直接将锁的拥有权传给等待队列中的第一个.
        // 注意此时state的mutexLocked还没有加锁,唤醒的goroutine会设置它。
        // 在此期间,如果有新的goroutine来请求锁, 因为mutex处于饥饿状态, mutex还是被认为处于锁状态,
        // 新来的goroutine不会把锁抢过去.
        runtime_Semrelease(&m.sema, true)
    }
}

问个问题

如果一个goroutine g1 通过 Lock 获取了锁, 在持有锁的期间, 另外一个goroutine g2 调用 Unlock 释放这个锁, 会出现什么现象?

  1. g2 调用 Unlock panic
  2. g2 调用 Unlock 成功,但是如果将来 g1 调用 Unlock 会 panic
  3. g2 调用 Unlock 成功,如果将来 g1 调用 Unlock 也成功

运行下面的例子试试:

package main

import (
    "sync"
    "time"
)

func main() {
    var mu sync.Mutex
    go func() {
        mu.Lock()
        time.Sleep(10 * time.Second)
        mu.Unlock()
    }()

    time.Sleep(time.Second)
    mu.Unlock()

    select {}
}
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章