Go Channel 应用模式

Channel是Go中的一种类型,和goroutine一起为Go提供了并发技术, 它在开发中得到了广泛的应用。Go鼓励人们通过Channel在goroutine之间传递数据的引用(就像把数据的owner从一个goroutine传递给另外一个goroutine), Effective Go 总结了这么一句话:

Do not communicate by sharing memory; instead, share memory by communicating.

Go内存模型 指出了channel作为并发控制的一个特性:

A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)

除了正常的在goroutine之间安全地传递共享数据, Channel还可以玩出很多的花样(模式), 本文列举了一些channel的应用模式。

促成本文诞生的因素主要包括:

  1. eapache的channels库
  2. concurrency in go 这本书
  3. Francesc Campoy的 justforfun系列中关于merge channel的实现
  4. 我在出版Scala集合手册这本书中对Scala集合的启发

下面就让我们以实例的方式看看这么模式吧。

Lock/TryLock 模式

我们知道, Go的标准库 syncMutex ,可以用来作为锁,但是 Mutex 却没有实现 TryLock 方法。

我们对于 TryLock 的定义是当前goroutine尝试获得锁, 如果成功,则获得了锁,返回true, 否则返回false。我们可以使用这个方法避免在获取锁的时候当前goroutine被阻塞住。

本来,这是一个常用的功能,在一些其它编程语言中都有实现,为什么Go中没有实现的? issue#6123 有详细的讨论,在我看来,Go核心组成员本身对这个特性没有积极性,并且认为通过channel可以实现相同的方式。

Hacked Lock/TryLock 模式

其实,对于标准库的 sync.Mutex 要增加这个功能很简单,下面的方式就是通过 hack 的方式为 Mutex 实现了 TryLock 的功能。

const mutexLocked =1 << iota

type Mutex struct {
	mu sync.Mutex
}

func (m *Mutex) Lock() {
	m.mu.Lock()
}

func (m *Mutex) Unlock() {
	m.mu.Unlock()
}

func (m *Mutex) TryLock() bool {
	return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)),0, mutexLocked)
}

func (m *Mutex) IsLocked() bool {
	return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked
}

如果你看一下 Mutex 实现的源代码,就很容易理解上面的这段代码了,因为 mutex 实现锁主要利用 CAS 对它的一个int32字段做操作。

上面的代码还额外增加了一个 IsLocked 方法,不过这个方法一般不常用,因为查询和加锁这两个方法执行的时候不是一个原子的操作,素以这个方法一般在调试和打日志的时候可能有用。

TryLock By Channel

既然标准库中不准备在 Mutex 上增加这个方法,而是推荐使用channel来实现,那么就让我们看看如何使用 channel来实现。

type Mutex struct {
	ch chan struct{}
}

func NewMutex() *Mutex {
	return &Mutex{make(chan struct{},1)}
}

func (m *Mutex) Lock() {
	m.ch <- struct{}{}
}

func (m *Mutex) Unlock() {
	<-m.ch
}

func (m *Mutex) TryLock() bool {
	select {
	case m.ch <- struct{}{}:
		return true
	default:
	}
	return false
}

func (m *Mutex) IsLocked() bool {
	return len(m.ch) >0
}

主要是利用channel边界情况下的阻塞特性实现的。

你还可以将缓存的大小从1改为n,用来处理n个锁(资源)。

TryLock with Timeout

有时候,我们在获取一把锁的时候,由于有竞争的关系,在锁被别的goroutine拥有的时候,当前goroutine没有办法立即获得锁,只能阻塞等待。标准库并没有提供等待超时的功能,我们尝试实现它。

type Mutex struct {
	ch chan struct{}
}

func NewMutex() *Mutex {
	return &Mutex{make(chan struct{},1)}
}

func (m *Mutex) Lock() {
	m.ch <- struct{}{}
}

func (m *Mutex) Unlock() {
	<-m.ch
}

func (m *Mutex) TryLock(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)
	select {
	case m.ch <- struct{}{}:
		timer.Stop()
		return true
	case <-time.After(timeout):
	}
	return false
}

func (m *Mutex) IsLocked() bool {
	return len(m.ch) >0
}

你也可以把它用 Context 来改造,不是利用超时,而是利用 Context 来取消/超时获得锁的操作,这个作业留给读者来实现。

Or Channel 模式

当你等待多个信号的时候,如果收到任意一个信号, 就执行业务逻辑,忽略其它的还未收到的信号。

举个例子, 我们往提供相同服务的n个节点发送请求,只要任意一个服务节点返回结果,我们就可以执行下面的业务逻辑,其它n-1的节点的请求可以被取消或者忽略。当n=2的时候,这就是 back request 模式。 这样可以用资源来换取latency的提升。

需要注意的是,当收到任意一个信号的时候, 其它信号都被忽略 。如果用channel来实现,只要从任意一个channel中接收到一个数据,那么所有的channel都可以被关闭了(依照你的实现,但是输出的channel肯定会被关闭)。

有三种实现的方式: goroutine、reflect和递归。

Goroutine方式

func or(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		var once sync.Once
		for _, c := range chans {
			go func(c <-chan interface{}) {
				select {
				case <-c:
					once.Do(func() { close(out) })
				case <-out:
				}
			}(c)
		}
	}()
	return out
}

or 函数可以处理n个channel,它为每个channel启动一个goroutine,只要任意一个goroutine从channel读取到数据,输出的channel就被关闭掉了。

为了避免并发关闭输出channel的问题,关闭操作只执行一次。

Reflect方式

Go的反射库针对select语句有专门的数据( reflect.SelectCase )和函数( reflect.Select )处理。

所以我们可以利用反射“随机”地从一组可选的channel中接收数据,并关闭输出channel。

这种方式看起来更简洁。

func or(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case0:
		return nil
	case1:
		return channels[0]
	}

	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		var cases []reflect.SelectCase
		for _, c := range channels {
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}

		reflect.Select(cases)
	}()

	return orDone
}

递归方式

递归方式一向是比较开脑洞的实现,下面的方式就是分而治之的方式,逐步合并channel,最终返回一个channel。

func or(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case0:
		return nil
	case1:
		return channels[0]
	}

	orDone := make(chan interface{})
	go func() {
		defer close(orDone)

		switch len(channels) {
		case2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			m := len(channels) /2
			select {
			case <-or(channels[:m]...):
			case <-or(channels[m:]...):
			}
		}
	}()

	return orDone
}

在后面的扇入(合并)模式中,我们还是会使用相同样的递归模式来合并多个输入channel,根据 justforfun 的测试结果,这种递归的方式要比goroutine、Reflect更有效。

Or-Done-Channel模式

这种模式是我们经常使用的一种模式,通过一个信号channel(done)来控制(取消)输入channel的处理。

一旦从done channel中读取到一个信号,或者done channel被关闭, 输入channel的处理则被取消。

这个模式提供一个简便的方法,把done channel 和 输入 channel 融合成一个输出channel。

func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case v, ok := <-c:
				if ok == false {
					return
				}
				select {
				case valStream <- v:
				case <-done:
				}
			}
		}
	}()
	return valStream
}

扇入模式

扇入模式(FanIn)是将多个同样类型的输入channel合并成一个同样类型的输出channel,也就是channel的合并。

Goroutine方式

每个channel起一个goroutine。

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		var wg sync.WaitGroup
		wg.Add(len(chans))

		for _, c := range chans {
			go func(c <-chan interface{}) {
				for v := range c {
					out <- v
				}
				wg.Done()
			}(c)
		}

		wg.Wait()
		close(out)
	}()
	return out
}

Reflect

利用反射库针对select语句的处理合并输入channel。

下面这种实现方式其实还是有些问题的, 在输入channel读取比较均匀的时候比较有效,否则性能比较低下。

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		defer close(out)
		var cases []reflect.SelectCase
		for _, c := range chans {
			cases = append(cases, reflect.SelectCase{
				Dir:  reflect.SelectRecv,
				Chan: reflect.ValueOf(c),
			})
		}

		for len(cases) > 0 {
			i, v, ok := reflect.Select(cases)
			if !ok { //remove this case
				cases = append(cases[:i], cases[i+1:]...)
				continue
			}
			out <- v.Interface()
		}
	}()
	return out

}

递归方式

这种方式虽然理解起来不直观,但是性能还是不错的(输入channel不是很多的情况下递归层级不会很高,不会成为瓶颈)

func fanInRec(chans ...<-chan interface{}) <-chan interface{} {
	switch len(chans) {
	case0:
		c := make(chan interface{})
		close(c)
		return c
	case1:
		return chans[0]
	case2:
		return mergeTwo(chans[0], chans[1])
	default:
		m := len(chans) /2
		return mergeTwo(
			fanInRec(chans[:m]...),
			fanInRec(chans[m:]...))
	}
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
	c := make(chan interface{})

	go func() {
		defer close(c)
		for a != nil || b != nil {
			select {
			case v, ok := <-a:
				if !ok {
					a = nil
					continue
				}
				c <- v
			case v, ok := <-b:
				if !ok {
					b = nil
					continue
				}
				c <- v
			}
		}
	}()
	return c
}

Tee模式

扇出模式(FanOut)是将一个输入channel扇出为多个channel。

扇出行为至少可以分为两种:

  1. 从输入channel中读取一个数据,发送给每个输入channel,这种模式称之为Tee模式
  2. 从输入channel中读取一个数据,在输出channel中选择一个channel发送

本节只介绍第一种情况,下一节介绍第二种情况

Goroutine方式

将读取的值发送给每个输出channel, 异步模式可能会产生很多的goroutine。

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
	go func() {
		defer func() {
			for i :=0; i < len(out); i++ {
				close(out[i])
			}
		}()

		for v := range ch {
			v := v
			for i :=0; i < len(out); i++ {
				i := i
				if async {
					go func() {
						out[i] <- v
					}()
				} else {
					out[i] <- v
				}
			}
		}
	}()
}

Reflect方式

这种模式一旦一个输出channel被阻塞,可能会导致后续的处理延迟。

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i :=0; i < len(out); i++ {
				close(out[i])
			}
		}()

		cases := make([]reflect.SelectCase, len(out))
		for i := range cases {
			cases[i].Dir = reflect.SelectSend
		}

		for v := range ch {
			v := v
			for i := range cases {
				cases[i].Chan = reflect.ValueOf(out[i])
				cases[i].Send = reflect.ValueOf(v)
			}

			for _ = range cases { // for each channel
				chosen, _, _ := reflect.Select(cases)
				cases[chosen].Chan = reflect.ValueOf(nil)
			}
		}
	}()
}

分布模式

分布模式将从输入channel中读取的值往输出channel中的其中一个发送。

Goroutine方式

roundrobin的方式选择输出channel。

func fanOut(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i :=0; i < len(out); i++ {
				close(out[i])
			}
		}()

		// roundrobin
		var i =0
		var n = len(out)
		for v := range ch {
			v := v
			out[i] <- v
			i = (i +1) % n
		}
	}()
}

Reflect方式

利用发射随机的选择。

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
	go func() {
		defer func() {
			for i :=0; i < len(out); i++ {
				close(out[i])
			}
		}()

		cases := make([]reflect.SelectCase, len(out))
		for i := range cases {
			cases[i].Dir = reflect.SelectSend
			cases[i].Chan = reflect.ValueOf(out[i])

		}

		for v := range ch {
			v := v
			for i := range cases {
				cases[i].Send = reflect.ValueOf(v)
			}
			_, _, _ = reflect.Select(cases)
		}
	}()
}

eapache

eapache/channels 提供了一些channel应用模式的方法,比如上面的扇入扇出模式等。

因为go本身的channel无法再进行扩展, eapache/channels 库定义了自己的channel接口,并提供了与channel方便的转换。

eapache/channels 提供了四个方法:

  • Distribute: 从输入channel读取值,发送到其中一个输出channel中。当输入channel关闭后,输出channel都被关闭
  • Tee: 从输入channel读取值,发送到所有的输出channel中。当输入channel关闭后,输出channel都被关闭
  • Multiplex: 合并输入channel为一个输出channel, 当所有的输入都关闭后,输出才关闭
  • Pipe: 将两个channel串起来

同时对上面的四个函数还提供了 WeakXXX 的函数,输入关闭后不会关闭输出。

下面看看对应的函数的例子。

Distribute

func testDist() {
	fmt.Println("dist:")
	a := channels.NewNativeChannel(channels.None)
	outputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}

	channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3])
	//channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3])

	go func() {
		for i :=0; i <5; i++ {
			a.In() <- i
		}
		a.Close()
	}()

	for i :=0; i <6; i++ {
		var v interface{}
		var j int
		select {
		case v = <-outputs[0].Out():
			j =0
		case v = <-outputs[1].Out():
			j =1
		case v = <-outputs[2].Out():
			j =2
		case v = <-outputs[3].Out():
			j =3
		}
		fmt.Printf("channel#%d: %d\n", j, v)
	}

}

Tee

func testTee() {
	fmt.Println("tee:")
	a := channels.NewNativeChannel(channels.None)
	outputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}

	channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3])
	//channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3])

	go func() {
		for i :=0; i <5; i++ {
			a.In() <- i
		}
		a.Close()
	}()

	for i :=0; i <20; i++ {
		var v interface{}
		var j int
		select {
		case v = <-outputs[0].Out():
			j =0
		case v = <-outputs[1].Out():
			j =1
		case v = <-outputs[2].Out():
			j =2
		case v = <-outputs[3].Out():
			j =3
		}
		fmt.Printf("channel#%d: %d\n", j, v)
	}
}

Multiplex

func testMulti() {
	fmt.Println("multi:")
	a := channels.NewNativeChannel(channels.None)
	inputs := []channels.Channel{
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
		channels.NewNativeChannel(channels.None),
	}

	channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])
	//channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3])

	go func() {
		for i :=0; i <5; i++ {
			for j := range inputs {
				inputs[j].In() <- i
			}
		}
		for i := range inputs {
			inputs[i].Close()
		}
	}()

	for v := range a.Out() {
		fmt.Printf("%d ", v)
	}
}

Pipe

func testPipe() {
	fmt.Println("pipe:")
	a := channels.NewNativeChannel(channels.None)
	b := channels.NewNativeChannel(channels.None)

	channels.Pipe(a, b)
	// channels.WeakPipe(a, b)

	go func() {
		for i :=0; i <5; i++ {
			a.In() <- i
		}
		a.Close()
	}()

	for v := range b.Out() {
		fmt.Printf("%d ", v)
	}
}

集合操作

从channel的行为来看,它看起来很像一个数据流,所以我们可以实现一些类似Scala 集合的操作。

Scala的集合类提供了丰富的操作(方法), 当然其它的一些编程语言或者框架也提供了类似的方法, 比如Apache Spark、Java Stream、ReactiveX等。

下面列出了一些方法的实现,我相信经过一些人的挖掘,相关的方法可以变成一个很好的类库,但是目前我们先看一些例子。

skip

skip函数是从一个channel中跳过开一些数据,然后才开始读取。

skipN

skipN跳过开始的N个数据。

func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i :=0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()

	return takeStream
}

skipFn

skipFn 提供Fn函数为true的数据,比如跳过偶数。

func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !fn(v) {
					takeStream <- v
				}
			}
		}
	}()
	return takeStream
}

skipWhile

跳过开头函数fn为true的数据。

func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		take := false
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !take {
					take = !fn(v)
					if !take {
						continue
					}
				}
				takeStream <- v
			}
		}
	}()
	return takeStream
}

take

skip的反向操作,读取一部分数据。

takeN

takeN 读取开头N个数据。

func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for i :=0; i < num; i++ {
			select {
			case <-done:
				return
			case takeStream <- <-valueStream:
			}
		}
	}()
	return takeStream
}

takeFn

takeFn 只筛选满足fn的数据。

func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if fn(v) {
					takeStream <- v
				}
			}
		}
	}()
	return takeStream
}

takeWhile

takeWhile只挑选开头满足fn的数据。

func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} {
	takeStream := make(chan interface{})
	go func() {
		defer close(takeStream)
		for {
			select {
			case <-done:
				return
			case v := <-valueStream:
				if !fn(v) {
					return
				}
				takeStream <- v
			}
		}
	}()
	return takeStream
}

flat

平展(flat)操作是一个有趣的操作。

如果输入是一个channel,channel中的数据还是相同类型的channel, 那么flat将返回一个输出channel,输出channel中的数据是输入的各个channel中的数据。

它与扇入不同,扇入的输入channel在调用的时候就是固定的,并且以数组的方式提供,而flat的输入是一个channel,可以运行时随时的加入channel。

func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			select {
			case <-done:
				return
			case v, ok := <-c:
				if ok == false {
					return
				}
				select {
				case valStream <- v:
				case <-done:
				}
			}
		}
	}()
	return valStream
}
func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
	valStream := make(chan interface{})
	go func() {
		defer close(valStream)
		for {
			var stream <-chan interface{}
			select {
			case maybeStream, ok := <-chanStream:
				if ok == false {
					return
				}
				stream = maybeStream
			case <-done:
				return
			}
			for val := range orDone(done, stream) {
				select {
				case valStream <- val:
				case <-done:
				}
			}
		}
	}()
	return valStream
}

map

map和reduce是一组常用的操作。

map将一个channel映射成另外一个channel, channel的类型可以不同。

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
	out := make(chan interface{})
	if in == nil {
		close(out)
		return out
	}

	go func() {
		defer close(out)

		for v := range in {
			out <- fn(v)
		}
	}()

	return out
}

因为 map 是go的关键字,所以我们不能命名函数类型为 map ,这里用 mapChan 代替。

比如你可以处理一个公司员工工资的channel, 输出一个扣税之后的员工工资的channel。

reduce

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
	if in == nil {
		return nil
	}

	out := <-in
	for v := range in {
		out = fn(out, v)
	}

	return out
}

你可以用`reduce`实现`sum`、`max`、`min`等聚合操作。

总结

本文列出了channel的一些深入应用的模式,相信通过阅读本文,你可以更加深入的了解Go的channel类型,并在开发中灵活的应用channel。也欢迎你在评论中提出更多的 channel的应用模式。

参考资料

  1. https://github.com/kat-co/concurrency-in-go-src
  2. https://github.com/campoy/justforfunc/tree/master/27-merging-chans
  3. https://github.com/eapache/channels
  4. https://github.com/LK4D4/trylock
  5. https://stackoverflow.com/questions/36391421/explain-dont-communicate-by-sharing-memory-share-memory-by-communicating
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章