译 | 如何优雅地关闭 Go 中的工作 goroutine

原文: Go - graceful shutdown of worker goroutines

在这篇博文中,我们将看看 Go 程序的优雅关闭。这类 Go 程序有一些执行任务的工作 goroutine,要求在程序关闭之前,这些工作 goroutine 必须完成任务。

介绍

在一个最近的项目中,我们有一个使用场景:一个基于 Go 的微服务不断地消费另一个第三方库发出的事件。这些事件在调用外部服务之前,会进行一些处理。而外部服务处理每个请求的速度都相当慢,但另一方面,它能够处理许多并发请求。因此,我们实现了一个简单的 worker 池,将输入事件扇出为几个并发执行的 goroutine。

总的来说,它看起来像这样:

然而,我们需要保证在该微服务关闭的时候,当前任何正在运行的对外部服务的请求必须完成,并且请求结果在我们的内部后端持久化。

worker 池和终止信号处理

worker 池模式 是一个有名的关于 worker 池的 Go 模式。此外,还有大量关于如何进行 基于 SIGTERM 通知的优雅关闭 的例子。但我们意识到,我们的一些需求使得使用场景有点更复杂。

当程序接收到 SIGTERM 或者 SIGINT 信号(例如,因容器编排器缩容到一定数目的副本数而产生的)时,在终止整个程序之前,必须允许当前任何工作中的 worker goroutine 完成它们长期运行的工作。

让事情稍微复杂些的是,我们对生产者端的库没有任何控制权。一开始我们会注册一个回调函数,每当生产端的库有了一个(我们需要的)事件,就会调用这个回调函数。该库会处于阻塞状态,直到回调函数结束执行。然后,当有更多事件产生时,库会再次调用这个函数。

worker-pool 的诸多 goroutine 通过使用标准的“对 channel 进行 range 操作”结构,来不断处理事件,例如:

func workerFunc() {
    for event := range jobsChan { // 阻塞直到接收到一个事件,或者该 channel 被关闭。
        // handle the event...
    }
}

这意味着,让一个 worker “结束”最干净的方式是关闭名为 “jobsChan” 的 channel。

在生产者端进行关闭

你首先学到的关于在 Go 中关闭 channel 的第一件事情之一是,如果向已关闭的 channel 发送数据,程序就会 panic。这归结于一个非常简单的规则:

“总是在生产者端关闭一个 channel(Always close a channel on the producer side)”

不管怎样,什么是生产者端呢?嗯,一般是那个将事件发送到 channel 里的 goroutine

func callbackFunc(event int) {
	jobsChan<-event
}

上面是我们的回调函数 callbackFunc,我们将其注册到外部库中,外部库就会将事件传给我们。 (为了让这些例子简单些,我将真实的事件替换为一个简单的整形,以作为负载。)

你要如何 安全地 保护上面的代码免于给已关闭的 channel 发送数据呢?一路沿着 Mutex、布尔型标志和 if 语句以确定是否一些_其他_ goroutine 关闭了 channel,以及控制是否应该允许发送数据,这并不简单。多留心潜在的竞争条件和不确定行为。

我们的解决方法是引入一个中间 channel 和一个内部的“消费者”,后者作为回调和任务 channel 之间的代理:

消费者函数看起来像这样:

func startConsumer(ctx context.Context) {
    // Loop until a ctx.Done() is received. Note that select{} blocks until either case happens
    for {
            select {
            case event := <-intermediateChan:
                jobsChan <- event
            case _ <- ctx.Done():
                close(jobsChan)
                return             // exit this function so we don't consume anything more from the intermediate chan
            }
    }
}

好了,等下。这个 “select” 和 “ctx.Done()” 是啥?

恕我直言, select 语句是 Go 最神奇的东西之一。它允许多个 channel 的等待和协同。在这种情况下,我们或者会从中间 channel 那里收到事件,然后将其传到 jobsChan,又或者会从 context.Context 接收到取消信号。

关闭 jobsChan 之后的 return 语句将让我们离开 for 循环和函数,这确保了 不会有新事件被传递给 jobsChan,并且不会从 intermediateChan 消费到 任何事件

所以,要么是传递事件到 jobsChan(worker 从这里消费),要么在作为生产者的 同一个 goroutine 中 关闭 jobsChan。

关闭 jobsChan 意味着消费端的所有 worker 将会停止遍历 jobsChan:

for event := range jobsChan { // <- on the close(jobsChan), all goroutines waiting for jobs here will exit the for-loop
    // handle the event...
}

发出取消信号

等待 Go 程序退出是一种有名的模式:

func main() {
    ... rest of program ...
    
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
    <-termChan // Blocks here until either SIGINT or SIGTERM is received.
    // 接下来呢?
}

在 “接下来呢?” 这一部分,捕获到 SIGINT 或者 SIGTERM 后,主 goroutine 恢复执行。我们需要告诉将事件从 intermediateChan 传到 jobsChan 的消费者,跨 goroutine 边界关闭 jobsChan。

再次,使用 Mutex 和条件语句来解决这个问题,技术上是可行的,但是相当难搞并且容易出错。作为替代,我们会利用前面提及的 context.Context 的取消支持。

func main() 的某个地方,我们设置了一个带取消支持的根 background context:

func main() {
    ctx, cancelFunc := context.WithCancel(ctx.Background())
    // ... some omitted code ...
    
    go startConsumer(ctx) // pass the cancellable context to the consumer function
    
    // ... some more omitted code ...
    <-termChan
    
    cancelFunc() // call the cancelfunc to notify the consumer it's time to shut stuff down.
}

这就是 < -ctx.Done() 这一 select case 如何被调用的,它开始优雅拆卸 channel 和 worker。

使用 WaitGroup

上面这个方法只有一个问题:调用 cancelFunc() 后,程序会立即退出,这意味着,正在动态调用中的工作 goroutine 将没有时间执行完毕,这使得我们系统中的处理有可能处于中间态。

我们需要停止关闭,直到所有的 worker 都报告说它们完成了工作。现在,我们进入 sync.WaitGroup ,它允许我们等待任意数目的 goroutine 结束!

当启动 worker 时,我们传递一个指向在 func main() 中创建的 WaitGroup 的指针:

const numberOfWorkers = 4

func main() {
    // ... omitted ...
    wg := &sync.WaitGroup{}
    wg.Add(numberOfWorkers)
    
    // Start [workerPoolSize] workers
    for i := 0; i < workerPoolSize; i++ {
        go workerFunc(wg)
    }
    
    // ... more omitted stuff ...
    
    <-termChan    // wait for SIGINT / SIGTERM
    cancelFunc()  // send the shutdown signal through the context.Context
    wg.Wait()     // program will wait here until all worker goroutines have reported that they're done
    fmt.Println("Workers done, shutting down!")
}

这会稍微改变我们的 worker 启动函数:

func workerFunc(wg *sync.WaitGroup) {
    defer wg.Done() // Mark this goroutine as done! once the function exits
    for event := range jobsChan {
        // handle the event...
    }
}

wg.Done() 将 waitgroup 减一,一旦内部计数器变成 0,那么主 goroutine 将继续执行 wg.Wait() 之下的语句。这就完成了优雅关闭!

运行

最终程序的源代码在下一个部分。在此其中,我添加了一些日志,这样就能看看该过程发生了什么。

下面是一个带有 4 个工作 goroutine 的程序的执行输出,这里,我使用 Ctrl+C 来停止程序:

$ go run main.go 
  Worker 3 starting
  Worker 2 starting
  Worker 1 starting
  Worker 0 starting
  Worker 3 finished processing job 0
  Worker 0 finished processing job 3
  ^C*********************************     <-- HERE I PRESS CTRL+C
  Shutdown signal received
  *********************************
  Worker 3 finished processing job 4
  Worker 2 finished processing job 1
  Worker 1 finished processing job 2
  Consumer received cancellation signal, closing jobsChan!   <-- Here, the consumer receives the <-ctx.Done()
  Worker 3 finished processing job 6
  Worker 0 finished processing job 5
  Worker 1 finished processing job 8
  Worker 2 finished processing job 7
  Worker 0 finished processing job 10
  Worker 0 interrupted                    <-- Worker 0 has finished job #10, 3 left
  Worker 2 finished processing job 12
  Worker 2 interrupted                    <-- Worker 2 has finished job #12, 2 left
  Worker 3 finished processing job 9
  Worker 3 interrupted                    <-- Worker 3 has finished job #9, 1 left
  Worker 1 finished processing job 11
  Worker 1 interrupted                    <-- Worker 1 has finished job #11, all done
  All workers done, shutting down!

有人可能会观察到,消费者接收到 < -ctx.Done() 的时间点实际上是不确定的,这是因为 Go 运行时调度 channel 上的通信到 select 语句的方法。Go 规范是这样说的:

“如果可以处理一个或多个通信,那么选择进行处理的那个 chanel 是通过统一的伪随机选择的。(If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection)”。

这就是为什么即使在按下 CTRL+C 之后,任务也可以被传给 worker。

另一个特别的事情是,似乎即使在关闭了 jobsChan _之后_,任务(任务 9-12)还是被传给 worker 了。恩,它们实际是在该 channel 被关闭 _之前_ 被传给 worker 的。这个现象会发生是因为我们使用了一个带有 4 个“槽” 的缓存 channel。这意味着,假定我们第三方生产者以比我们的 worker 可以处理的速度更快地不断传递新事件,如果所有四个 worker 都从 channel 中消费了一个任务并且处理它们,那么该 channel 里就可能会有四个新的事件正等待被消费。关闭 channel 并不会影响那些已经缓存到 channel 里的数据 —— Go 允许消费者消费它们。

如果我们将 jobsChan 修改为无缓存的:

jobsChan := make(chan int)

然后再次运行:

$ go run main.go
.... omitted for brevity ....
^C*********************************
Shutdown signal received
*********************************
Worker 3 finished processing job 3
Worker 3 started job 5
Worker 0 finished processing job 4
Worker 0 started job 6
Consumer received cancellation signal, closing jobsChan! <-- again, it may take some time until the consumer is handed <-ctx.Done()
Consumer closed jobsChan
Worker 1 finished processing job 1     <-- From here on, we see that each worker finishes exactly one job before being interrupted.
Worker 1 interrupted
Worker 2 finished processing job 2
Worker 2 interrupted
Worker 0 finished processing job 6
Worker 0 interrupted
Worker 3 finished processing job 5
Worker 3 interrupted
All workers done, shutting down!

这一次,在 channel 关闭后,我们就没有看到任何“不期望的”任务被 worker 消费了。然而,让 channel 缓存跟 worker 数相同的数据,是在不必要拖慢生产端的情况下,让 worker 保持处理数据的常见优化手法。

完整的程序

上面的代码片段在某些地方进行了简化,以使得它们尽可能简洁。带有某些结构以封装和模拟第三方生产者的完整程序如下:

package main

import (
	"context"
	"fmt"
	"math/rand"
	"os"
	"os/signal"
	"sync"
	"syscall"
	"time"
)

const workerPoolSize = 4

func main() {
	// 创建消费者
	consumer := Consumer{
		ingestChan: make(chan int, 1),
		jobsChan:   make(chan int, workerPoolSize),
	}

	// 模拟外部库:每秒发送 10 个事件
	producer := Producer{callbackFunc: consumer.callbackFunc}
	go producer.start()

	// 设置取消 context 和 waitgroup
	ctx, cancelFunc := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}

	// 传递取消 context 以启动消费者
	go consumer.startConsumer(ctx)

	// 启动 worker,并添加 [workerPoolSize] 到 WaitGroup
	wg.Add(workerPoolSize)
	for i := 0; i < workerPoolSize; i++ {
		go consumer.workerFunc(wg, i)
	}

	// 处理终止信号,并等待 termChan 信号
	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)
	
	<-termChan         // 这里阻塞直到接收到信号

	// 处理关闭
	fmt.Println("*********************************\nShutdown signal received\n*********************************")
	cancelFunc()       // 向 context.Context 发送取消信号
	wg.Wait()          // 这里阻塞直至所有 worker 完成
	
	fmt.Println("All workers done, shutting down!")
}

消费者结构:

// -- 从这里起,下面是 Consumer!
type Consumer struct {
	ingestChan chan int
	jobsChan   chan int
}

// 每当外部库传递一个事件给我们,就会调用 callbackFunc。
func (c Consumer) callbackFunc(event int) {
	c.ingestChan <- event
}

// workerFunc 启动一个 worker 函数,它会遍历 jobsChan,直到该 channel 关闭。
func (c Consumer) workerFunc(wg *sync.WaitGroup, index int) {
	defer wg.Done()

	fmt.Printf("Worker %d starting\n", index)
	for eventIndex := range c.jobsChan {
		// 模拟工作执行 1 ~ 3 秒
		fmt.Printf("Worker %d started job %d\n", index, eventIndex)
		time.Sleep(time.Millisecond * time.Duration(1000+rand.Intn(2000)))
		fmt.Printf("Worker %d finished processing job %d\n", index, eventIndex)
	}
	fmt.Printf("Worker %d interrupted\n", index)
}

// startConsumer 作为 ingestChan 和 jobsChan 之间的代理,使用 select 语句以支持优雅关闭。
func (c Consumer) startConsumer(ctx context.Context) {
	for {
		select {
		case job := <-c.ingestChan:
			c.jobsChan <- job
		case <-ctx.Done():
			fmt.Println("Consumer received cancellation signal, closing jobsChan!")
			close(c.jobsChan)
			fmt.Println("Consumer closed jobsChan")
			return
		}
	}
}

最后,模拟外部库的生产者结构:

// -- Producer 模拟一个外部库,每 100ms 有新数据时simulates an external library that invokes the 
// 它会调用注册的回调函数。
type Producer struct {
    callbackFunc func(event int)
}
func (p Producer) start() {
    eventIndex := 0
    for {
        p.callbackFunc(eventIndex)
        eventIndex++
        time.Sleep(time.Millisecond * 100)
    }
}

总结

我希望这篇小博文提供了一个简单的例子,说明了基于 goroutine 的 worker 池,以及如何使用基于 context 的取消、WaitGroup 和生产端 channel 关闭,来优雅关闭这些 goroutine。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章