Go36-29,30-原子操作

原子操作

对于一个Go程序来说,GO语言运行时系统中的调度器会恰当的安排其中所有的goroutine的运行。不过,在同一时刻,只会有少数的goroutine真正处于运行状态。为了公平起见,调度器会频繁的切换这些goroutine。这个中断的时机有很多,任何两个语句执行的间隙,甚至是在某条语句执行的过程中都是可能的。即使这些语句在临界区之内也是一样的,互斥锁虽然能保护临界区中的代码串行执行,但是不能保证这些代码的 原子性 (atomicity)。

原子操作的特点

真正能够保证原子性执行的工具只有 原子操作 (atomic operation)。

原子操作在进行的过程中是不允许中断的。在底层,这会由CPU提供芯片级别的支持,所以绝对有效。

原子操作可以完全的消除竞态条件,并能够绝对的保证并发安全。并且它的执行速度比其他的同步工具快得多,通常会高出好几个数量级。

缺点

正是因为原子操作不能被中断,所要它需要足够简单,并且要求快速。因此,操作系统层面值对针对二进制位或整数的原子操作提供支持。

Go语言的原子操作是基于CPU和操作系统的,所以它也只针对少数数据类型的值提供了原子操作函数。这些函数都在标准库的sync/atomic中。

sync/atomic 包

sync/atomic 包中可以做的原子操作有:

  • 加法(add)
  • 比较并交换(compare and swap),简称CAS
  • 加载(load)
  • 存储(store)
  • 交换(swap)

这些函数针对的数据类型并不多。但是,对这些类型中的每一个,sync/stomic包都会有一套函数给予支持。这些数据类型有:

  • int32
  • int64
  • uint32
  • uint64
  • uintptr
  • unsafe.Pointer,这个类型在包里没有提供原子加法操作的函数
  • atomic.Value,包里还提供了这个类型,可以被用来存储任意类型的值

函数需要传入被操作值的指针

原子操作函数的第一个参数值都应该是那个被操作的值,并且是传入指针,比如:*int32。

原子操作函数需要的是被操作值的指针,而不是值本身。即使是unsafe.Pointer类型虽然本身已经是指针类型,但是原子操作函数里还要要这个值的指针。

只要原子操作函数拿到了被操作值的指针,就可以定位到存储该值的内存地址。只有这样,才能够通过底层的指令,操作这个内存地址上的数据。

加法操作也可以用来做减法

包里只提供了加法操作的函数,没有减法操作的函数,不过是可以实现减法的。比如:atomic.AddInt32。函数的第二个参数代表的差量,它的类型是int32,这个类型是有符号的。这里可以传个负整数就是做减法了。

对于atomic.AddInt64也是类型的。不过对于atomic.AddUint32和atomic.AddUint64要做原子减法就不能这么直接了,因为第二参数的值是uint32和uint64,这些类型是无符号的。比如要减3,差量就是-3,要先把差量转换为有符号的类型比如int32,然后再把该值转换为uint32,用表达式描述就是:

uint32(int32(-3))

不过上面这样写,会使编译器报错,因为这么做其实会让表达式的结果值溢出。不过可以先把int32(-3)先赋值给一个临时变量比如名字就叫delta,来绕过编译器的检查。

上面的那种方式比较好理解,另外还有一种方式第二的参数用下面的表达式:

^uint32(-(-3)-1)

上面这么做的原理,简单来说就是取补码,具体就要去了解一下计算机中的原码、补码、反码,以及是如何实现减法的了。

上面两中方式是等价的,下面是实例代码:

package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    num := uint32(18)
    delta := int32(-3)
    atomic.AddUint32(&num, uint32(delta))
    fmt.Println(num)

    atomic.AddUint32(&num, ^uint32(-(-3)-1))
    fmt.Println(num)
}

比较并交换

比较并交换操作即CAS操作,是有条件的交换操作,只有在条件满足的情况下才会进行值的交换。

交换,指的是把新值赋值给变量,并返回变量的旧值。

在进行CAS操作的时候,函数会先判断被操作变量的值,是否与预期的旧值相等。如果相等,就把新值赋给该变量,并返回true以表明交换操作已经进行。否则就忽略交换操作,并返回false。CAS操作并不是一个单一的操作,而是一种操作组合。这与其他原子操作都不同。正式因为如此,它的用途要更广泛一些。比如,将它与for语句联用,就可以实现一种简易的 自旋锁 (spinlock)。

自旋锁自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。

for {
    if atomic.CompareAndSwapInt32(&num, 10, 0) {
        fmt.Println("检查到num为10,清0")
        break
    }
    time.Sleep(time.Millisecond * 300)
}

下面是代码完整的展示,实现了建议的自旋锁:

package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

func main() {
    done := make(chan struct{})
    var num int32

    // 定时增加num的值
    go func() {
        defer func() {
            done <- struct{}{}
        }()
        for {
            time.Sleep(time.Millisecond * 500)
            newNum := atomic.AddInt32(&num, 1)
            fmt.Println("num:", newNum)
            if newNum > 10 {
                break
            }
        }
    }()

    // 定时检查num的值,如果等于10,就设置为0
    go func() {
        defer func() {
            done <- struct{}{}
        }()
        for {
            if atomic.CompareAndSwapInt32(&num, 10, 0) {
                fmt.Println("检查到num为10,清0")
                break
            }
            time.Sleep(time.Millisecond * 500)
        }
    }()

    <- done
    <- done
    fmt.Println("Over")
}
乐观锁

在for语句中的CAS操作,可以不停的检查某个需要满足的条件,一旦条件满足就退出for循环。这相当于只要条件不满足,当前流程就会被一直 阻塞 在这里。

这个在效果上与互斥锁类型,不过适用场景不同。互斥锁总是假设共享资源的状态会被其他的goroutine频繁的改变,是一种 悲观锁 。而这里是假设共享资源状态的改变并不频繁,所以你的操作一般都是如期望的那样成功,是一种更加乐观,更加宽松的做法,就是 乐观锁

下面的例子,启用了多个goroutine都要对num的值做加法操作。

package main

import (
    "time"
    "sync/atomic"
    "fmt"
)

var done chan struct{} = make(chan struct{})
var num int32

func CompareAndAdd(id, times int, increment int32) {
    defer func() {
        done <- struct{}{}
    }()
    for i := 0; i < times; i++ {
        for {
            currNum := atomic.LoadInt32(&num)  // 先获取当前的值
            newNum := currNum + increment  // 这里希望对num做加法
            // 假设是一个耗时的操作,就是有可能在这段时间里有别的goroutine已经修改了num
            time.Sleep(time.Millisecond * 300)
            // 比较现在num的值和操作前获取到的值是否一致,如果一致,表示表里没有别修改过,可以更新为新值
            // 如果不一致,表示在这段时间里num已经被别的goroutine修改过了,必须重新来过
            if atomic.CompareAndSwapInt32(&num, currNum, newNum) {
                fmt.Printf("更新num[%d-%d]: +%d = %d\n", id, i, increment, newNum)
                break
            } else {
                fmt.Printf("更新num失败[%d-%d],重试...\n", id, i)
            }
        }
    }
}

func main() {
    go CompareAndAdd(1, 6, 2)
    go CompareAndAdd(2, 4, 3)
    go CompareAndAdd(3, 3, 4)
    <- done
    <- done
    <- done
    fmt.Println("Over")
}

这里在把加法后的新值赋值给原来的变量num前,先检查此时num的值是否发生过变化了,如果没有发生变化,就可以将num设置为新值。否则就从头在做一次加法运行、检查、赋值,直到成功为止。在假设共享资源状态的改变并不频繁的前提下,这种实现是比悲观锁更好的。

读写操作都要实现原子操作

在已经保证了对一个变量的写操作都是原子操作,比如:加法、存储、交换等等。在对它进行读操作的时候,依然有必要使用原子操作。

参考读写锁,写操作和读操作之间是互斥的,这是为了防止读操作读到还没有被修改完的值。如果读操作读到一半就被中断了,等再回来继续读取的时候,就读到了修改前后两部分的内容。这显然破坏了值的完整性。所以,一旦决定对一个共享资源进行保护,就要做到完全的保护。

适用场景

由于原子操作函数只支持非常有限的数据类型,所以在很多应用场景下,互斥锁更加合适。不过如果当前场景下可以使用原子操作,就不要考虑互斥锁了。

因为原子操作函数的执行速度要比互斥锁快的多。而且,使用起来也更加简单,不会涉及临界区的选择,以及死锁等问题。就是原子操作更加高效,而互斥锁适用场景更广,优先考虑是否可以使用原子操作。

原子变量

为了扩大原子操作的适用范围,Go语言在1.4版本之后,在sync/atomic包中添加了一个新类型Value。此类型的值相当于一个容器,可以被用来原子的存储和加载任意的值。atomic.Value类型是开箱即用的,声明一个该类型的变量之后就可以直接使用了,可以称它为 原子变量 。而原子变量的值,可以称为 原子值

这个类型使用起来很简单,只有两个指针方法:Store()和Load(),不过还是有一些需要注意的地方。

原子值的复制

一旦atomic.Value类型的值,就是原子值被真正使用,它就不应该再被复制了。

只要用它来存储值了,就相当于开始真正使用了。atomic.Value类型属于结构体类型,而结构体类型属于值类型。

所以,复制该类型的值会产生一个完全分离的新值。这个新值相当于被复制的那个值的一个快照。之后,不论后者存储的值怎样改变,都不会影响到前者的使用,反之亦然。

这个是进行验证的示例代码:

func main() {
    var box atomic.Value
    box2 := box  // 原子值真正使用之前可以被复制
    v1 := [...]int{1,2,3}
    box.Store(v1)  // 对box1的改变,不会影响到box2
    fmt.Println(box.Load())
    fmt.Println(box2.Load())
}

上面我把原话都引用过来了,下面是我的理解。上面的 box2 := box 这句,编译器是有绿色的提示的,不影响运行但是应该要引起我们注意。然后在源码里也找到了一些建议:

// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
    noCopy noCopy

    v interface{}
}

// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://github.com/golang/go/issues/8005#issuecomment-190753527
// for details.
type noCopy struct{}

看着意思也就是上面说的,使用之后就不要再复制了。

不过既然这个类型是开箱即用的,那么只要再声明一个变量使用就好了,没有必要复制一个来使用。另外就是因为这是一个值类型,所以赋值的是副本,对原值的改变不会影响到副本。如果需要,就用指针。

上面示例中复制的用法很傻,应该不会也想不到要这么用。仔细想想,真正需要复制该值的情况可能是作为函数的参数,就是先声明一个开箱即用的原子值,然后在不同的函数里都把这个原子值作为参数。这里是可以的,而且也很方便。只要知道每个函数里都是不同的原子值就行了,就是值类型传参要注意的那些问题。下面是我想的一个场景:

package main

import (
    "fmt"
    "sync/atomic"
)

func loadBox(box atomic.Value, v interface{}) {
    box.Store(v)
    fmt.Println(box.Load())
}

func main() {
    var box atomic.Value  // 下面调用了3次函数,就是复制了box3次
    v1 := [...]int{1,2,3}
    loadBox(box, v1)
    v2 := "Hello"
    loadBox(box, v2)
    v3 := 123
    loadBox(box, v3)
}

原子值储值的规则

用原子值来储值有两条强制性的规则:

  1. 不能用原子值存储nil
  2. 向原子值存储额第一个值,决定了它今后能且只能存储哪一个类型的值
不能存储nil

就是不能把nil作为参数传入原子值的Store方法,否则就会引发panic。

这里还有注意接口类型的变量,它的动态值是nil,但是动态类型却不是nil,所以它的值就不等于nil。这样的一个变量的值是可以被存入原子值的。

就是不能存nil, box.Store(nil) 这样是要panic的。只要是有类型的,值是nil也是可以的,下面这样用是没问题的:

func main() {
    var box atomic.Value
    var v1 chan struct{}
    fmt.Println(v1)
    box.Store(v1)
    fmt.Println(box.Load())
}

上面提到了接口,其实Stroe方法接收的参数就是空接口:

func (v *Value) Store(x interface{}) {
    // 省略函数内容
}
存储的类型

接着上面的说,Store接收的参数是一个空接口,并且还说了不能是nil。所以只要是不是nil都可以作为参数。这只是作为第一次使用的情况。

一旦向原子值存储了第一个值,就决定了类型,之后再要存储,就必须还是同样的类型了。这个规则,就是通过接口也是绕不开的。原子值内部是依据被存储值的实际类型来做判断的。

这里还有个问题,我们是无法通过某个方法获知一个原子值是否已经被真正使用。并且,也没有办法通过常规的途径得到一个原子值可以存储值的实际类型。这使得误用原子值的可能性大大增加,尤其是在多个地方使用同一个原子值的时候。

通过下面的示例,可以理解一下:

func main() {
    var box atomic.Value
    box.Store("")  // 存入字符串
    box2 := box  // 在真正使用之后,就不应该被复制
    // box2.Store(1)  // 存字符串以外的类型就会引发panic
    box2.Store("1")
    _ = box2
}

使用建议

一、不要把内部使用的原子值暴露给外界。比如,声明一个全局的原子变量并不是一个正确的做法。这个变量的访问权限最起码也应该是包级私有的。

二、如果不得不让包外,或者模块外的代码使用你的原子值,那么可以声明一个包级私有的原子变量,然后再通过一个或多个公开的函数,让外界间接的使用到它。注意,这种情况下不要把原子值传递到外界,不论是传递原子值本身还是它的指针。

三、如果通过某个函数可以向内部的原子值存储值的话,那么就应该在这个函数中先判断被存储值类型的合法性。若不合法,则应该直接返回对应的错误值,从而避免panic的发生。

四、如果可能的话,我们可以把原子值封装到一个数据类型中,比如一个结构体。这样我们既可以通过该类型的方法更加安全地存储值,又可以在该类型中包含可村储值的合法类型信息。

概括一下,上面说的就是一个最佳实践,用一个结构体来封装。并且解决了前面提到的没有办法获取到原子值存储的实际类型的问题:

package main

import (
    "reflect"
    "os"
    "fmt"
    "sync/atomic"
)

// 创建结构体,封装atomic.Value和村储值的合法类型
// 字段都是私有的,下面提供了4个可导出的方法
type atomicValue struct {
    v atomic.Value
    t reflect.Type
}

// 提供方法,返回存储值的合法类型
func (av *atomicValue) TypeOfValue() reflect.Type {
    return av.t
}

// 提供方法,存储值。存储之前先检查类型
func (av *atomicValue) Store(v interface{}) error {
    if v == nil {
        return fmt.Errorf("不能存储nil")
    }
    t := reflect.TypeOf(v)
    if t != av.t {
        return fmt.Errorf("类型不正确, 需要: %s, 实际: %s", av.t, t)
    }
    av.v.Store(v)
    return nil
}

// 提供方法,获取值,虽然示例中没有用到
func (av *atomicValue) Load() interface{} {
    return av.v.Load()
}

// 创建结构体的方法,相当于构造方法
func NewAtomicValue(x interface{}) (*atomicValue, error) {
    if x == nil {
        return nil, fmt.Errorf("不能存储nil")
    }
    return &atomicValue{
        t: reflect.TypeOf(x),  // 获取变量的类型,返回reflect.Type类型
    }, nil
}

func main() {
    v := fmt.Errorf("随便的错误")
    box, err := NewAtomicValue(v)
    if err != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
    }
    fmt.Printf("合法的类型是: %s\n", box.TypeOfValue())
    v2 := fmt.Errorf("还是一个错误类型")
    err = box.Store(v2)
    if err != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
    }
    fmt.Printf("存储了一个值,类型是: %T\n", v2)
    fmt.Println("尝试存储一个其他类型的值")
    err = box.Store(1)
    if err != nil {
        fmt.Fprintf(os.Stderr, "ERROR: %s\n", err)
    }
}

存储引用类型

这里还要特别强调一点:尽量不要向原子值中存储引用类型的值。因为这很容易造成安全漏洞。尽量不要的意思就是要存还是可以存的,下面的示例中也给出了建议的方法:

package main

import (
    "fmt"
    "sync/atomic"
)

func main() {
    var box atomic.Value
    v := []int{1,2,3}  // 切片是引用类型
    box.Store(v)
    v[1] = 4  // 此处的操作不是并发安全的!
    fmt.Println(box.Load())  // 存储的值被改变了

    // 正确的做法如下:
    // 下面这个函数就是把引用类型复制一份出来,然后存储起来
    // 类似于把一个值类型传递给函数的效果
    store := func(v []int) {
        replica := make([]int, len(v))
        copy(replica, v)
        box.Store(replica)
    }
    store(v)
    v[2] = 5  // 再试着改变切面的值
    fmt.Println(box.Load())  // 存储的是副本的值,不会被上面的改变影响
}

这里把一个切片类型存储了原子值。切片类型属于引用类型,所以在外面依然可以改变切片的值。这相当于绕过了原子值而进行了非并发安全的操作。

这里应该先为切片创建一个完全的副本,然后再把副本存储box。如此一来,在对原来的切片做修改都不会破坏box提供的安全保护。

总结

原子操作明显比互斥锁要更加轻便,但是限制也很明显。所以如果可以使用原子操作的话,一定是用原子操作更好。

现在有了原子值,突破了一些原子操作的限制。在原子值与互斥锁之间选择的时候,就需要仔细考虑了。这篇里讲了很多使用原子值时候的注意事项,可能用的时候就不如互斥锁这么好用了。

另外在CAS中还会遇到一个ABA问题,而原子类型应该就会有这个ABA问题。此时就要用互斥锁了,除非业务对ABA问题不敏感

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章