微信开源 PhxQueue:高可用、高可靠、高性能的分布式队列

开源地址:https://github.com/Tencent/phxqueue

PhxQueue 是微信开源的一款基于 Paxos 协议实现的高可用、高吞吐和高可靠的分布式队列,保证At-Least-Once Delivery,在微信内部广泛支持微信支付、公众平台等多个重要业务。

消息队列概述

消息队列作为成熟的异步通信模式,对比常用的同步通信模式,有如下优势:

  1. 解耦:防止引入过多的 API 给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。

  2. 削峰和流控:消息生产者不会堵塞,突发消息缓存在队列中,消费者按照实际能力读取消息。

  3. 复用:一次发布多方订阅。

PhxQueue诞生背景

旧队列

微信初期使用的分布式队列(称为旧队列)是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦、缓存、异步化等能力。

旧队列以 Quorum NRW 作为同步机制,其中 N=3、W=R=2,刷盘方式采用异步刷盘,兼顾了性能和可用性。

新需求

随着业务发展,接入业务种类日益增多,旧队列逐渐显得力不从心,主要不足如下:

  • 异步刷盘,数据可靠性堪忧

对于支付相关业务,保证数据可靠是首要需求。

目前大多数分布式队列方案是以同步复制+异步刷盘来保证数据可靠性的,但我们认为需要同步刷盘来进一步提高数据可靠性。

  • 乱序问题

部分业务提出了绝对有序的需求,但 NRW 并不保证顺序性,无法满足需求。

另外旧队列还存在出队去重、负载均衡等其他方面的问题亟需改善。上述种种促使了我们考虑新的方案。

业界方案的不足

Kafka 是大数据领域常用的消息队列,最初由 LinkedIn 采用 Scala 语言开发,用作 LinkedIn 的活动流追踪和运营系统数据处理管道的基础。

其高吞吐、自动容灾、出入队有序等特性,吸引了众多公司使用,在数据采集、传输场景中发挥着重要作用,详见Powerd By Kafka。

但我们充分调研了 Kafka,认为其在注重数据可靠性的场景下,有如下不足:

Kafka 性能与同步刷盘的矛盾

Kafka 在开启配置 log.flush.interval.messages=1,打开同步刷盘特性后,吞吐会急剧下降。

该现象由如下因素导致:

  • SSD 写放大

业务消息平均大小在数 1k 左右。

而 SSD 一次刷盘的最小单位为一个 page size,大小为 4k。

当 Kafka 对大小不足 4k 的消息进行刷盘时,实际写入的物理数据量是消息大小的数倍。导致硬盘写带宽资源被浪费。

  • 业务场景下 Producer batch 效果不好

Kafka Producer batch,简单来说,就是把多个消息打包在一起发送到 Broker,广泛用于大数据场景。按道理,batch效果足够,是能抵消写放大的影响的。

但业务场景下的消息生产不同于大数据场景下的日志生产,每个需要入队的业务请求在业务系统中有独立的上下文,batch难度大。即使在业务和Broker之间加入代理层,将Producer转移到代理层内进行batch,也因代理层的节点数众多,batch效果难以提高,导致写放大无法抵消。

Kafka replica 同步设计上的不足

Kafka replica 同步设计概要:

Kafka Broker leader 会跟踪与其保持同步的 follower 列表,该列表称为ISR(即in-sync Replica)。如果一个 follower 宕机,或者落后太多,leader 将把它从ISR中移除。

该同步方式偏重于同步效率,但是在可用性方面表现略显不足:

  • Broker fail over 过程成功率下降严重

在3 replicas的场景下,leader 均匀分布在各 Broker 上,一个Broker出现故障,就意味着1/3的 leader、follower 离线,这时读写成功率下降:

  • 对于 leader 离线的 partition,暂时无法读写,需要等待 Controller 选举出新的 leader 后才能恢复;

  • 对于 follower 离线的 partition,也暂时无法读写,需要等待一定时长(取决于 replica.lag.time.max.ms,默认10s)后,leader 将故障 follower 从 ISR 中剔除才能恢复。

也就是说,任意一个 Broker 故障时,读写成功率会在一段时间内降为0。

  • 同步延迟取决于最慢节点

在同步复制场景下,需要等待所有节点返回ack。

通过对比 Kafka replica 与 Paxos 的表现, 我们认为在同步方式上 Paxos 是更好的选择

所以,我们基于旧队列,用 Paxos 协议改造了同步逻辑,并且进行了包括同步刷盘之内的多项优化,完成了 PhxQueue。

PhxQueue 介绍

PhxQueue 目前在微信内部广泛支持微信支付、公众平台等多个重要业务,日均入队达千亿,分钟入队峰值达一亿。

其设计出发点是高数据可靠性,且不失高可用和高吞吐,同时支持多种常见队列特性。

PhxQueue支持的特性如下:

  • 同步刷盘,入队数据绝对不丢,自带内部实时对账

  • 出入队严格有序

  • 多订阅

  • 出队限速

  • 出队重放

  • 所有模块均可平行扩展

  • 存储层批量刷盘、同步,保证高吞吐

  • 存储层支持同城多中心部署

  • 存储层自动容灾/接入均衡

  • 消费者自动容灾/负载均衡

PhxQueue 设计

整体架构

PhxQueue 由下列5个模块组成。

Store - 队列存储

Store 作为队列存储,引入了 PhxPaxos 库,以 Paxos 协议作副本同步。只要多数派节点正常工作及互联,即可提供线性一致性读写服务。

为了提高数据可靠性,同步刷盘作为默认开启特性,且性能不亚于异步刷盘。

在可用性方面,Store 内有多个独立的 paxos group,每个 paxos group 仅 master 提供读写服务,平时 master 动态均匀分布在 Store 内各节点,均衡接入压力,节点出灾时自动切换 master 到其它可用节点。

Producer - 生产者

Producer 作为消息生产者,根据 key 决定消息存储路由。相同 key 的消息默认路由到同一个队列中,保证出队顺序与入队顺序一致。

Consumer - 消费者

Consumer 作为消费者,以批量拉取的方式从 Store 拉消息,支持多协程方式批量处理消息。

Consumer 以服务框架的形式提供服务,使用者以实现回调的方式,根据不同主题(Topic),不同处理类型(Handler)定义具体的消息处理逻辑。

Scheduler - 消费者管理器(可选择部署)

Scheduler 的作用是,收集 Consumer 全局负载信息, 对 Consumer 做容灾和负载均衡。当使用者没有这方面的需求时,可以省略部署 Scheduler,此时各 Consumer 根据配置权重决定与队列的处理关系。

部署 Scheduler 后,Scheduler leader 与所有 Conusmer 维持心跳,在收集 Consumer 的负载信息的同时,反向调整 Consumer 与队列的处理关系。

当 Scheduler leader 宕机了后,Scheduler 依赖下述分布式锁服务选举出新 leader,不可用期间仅影响 Consumer 的容灾和负载均衡,不影响 Consumer 的正常消费。

Lock - 分布式锁(可选择部署)

Lock 是一个分布式锁,其接口设计非常通用化,使用者可以选择将 Lock 独立部署,提供通用分布式锁服务。

Lock 在 PhxQueue 中的作用有如下两点:

  1. 为 Scheduler 选举 leader;

  2. 防止多个 Consumer 同时处理一条队列。

Lock 同样也是可选择部署的模块:

  • 若部署了 Scheduler,就必须部署 Lock 为 Scheduler 选举出 leader;

  • 否则,若业务对重复消费不敏感,可选择不部署 Lock。

这里所指的重复消费场景是:若省略部署 Scheduler 的话,Consumer 需要通过读取配置得知可处理的队列集合;当队列有变更(如队列缩扩容)时,各 Consumer 机器上的配置改变有先有后,这时各 Consumer 在同一时间看到的配置状态可能不一样,导致一段时间内两个 Consumer 都认为自己该消费同一个队列,造成重复消费。Lock 的部署可以避免该场景下的重复消费。(注意,即使省略部署 Lock,该场景仅造成重复消费,而不会造成乱序消费)

Store 复制流程

PhxQueue Store 通过 PhxPaxos 协议进行副本复制。

PhxPaxos 的工程实现方式分为三层:app 层负责处理业务请求,paxos 层执行 paxos同步过程,状态机层更新业务状态。

其中,app 层发起 paxos 提议,paxos 层各节点通过 paxos 协议共同完成一个 paxos log 的确认,之后状态机以 paxos log 作为的输入作状态转移,更新业务的状态,最后返回状态转移结果给 app 层。

一致的状态机层,加上来自 paxos 层的一致输入,就产生一致的状态转移,从而保证多个节点强一致。

这里我们要基于 PhxPaxos 在状态机层实现一个队列,就需要作如下概念映射:

  • 队列这种模型不涉及数据修改,是有序的数据集合,和 paxos log 的定义很像,所以可以让入队的数据直接作为 paxos log,而状态机只需要保存 paxos log 序列。

  • instance id 的严格递增特性,使得它可以方便地作为队列偏移。

  • 队列中读偏移之前的数据,认为是可以删除的数据,这点和 check point 的定义一致。

整体上队列状态机和 paxos 能很好地切合。

Store Group Commit - 高效刷盘及副本同步

未经优化的 Paxos 协议并未解决同步刷盘的写放大问题。而且,其副本同步效率不如 Kafka。

原因是,Kafka 的副本同步是流式批量的,而 Paxos 协议是以 paxos log 为单位串行同步,每个 paxos log 的同步开销是 1个RTT + 1次刷盘。

在多DC部署的场景下,ping 时延可达4ms,这样会导致单个 paxos group 的理论最高 TPS 仅250。

我们采用多 paxos group 部署 以及 Group Commit 的方式来同时解决同步刷盘的写放大问题以及Paxos吞吐问题。

如上图, 我们部署多个paxos group,以 paxos group 作为 Group Commit 的单位,一个 paxos group 内对应多个queue,将多个queue在一段时间内入队的数据合并在一起,当等待耗时或积累数据数目达到阀值,才会触发一次Paxos同步和同步刷盘,等待期间前端阻塞。

与Kafka的Producer批量逻辑相比,在存储层以 Group Commit 进行批量合并的好处如下:

  1. 业务层无需关注如何组织请求进行批量;

  2. 在存储层以 paxos group 为单位的聚合效果比上层聚合效果更好。

PhxQueue 与 Kafka 对比

下面分别从设计、性能、存储层 failover 过程三方面对比 PhxQueue 与 Kafka。

设计对比

PhxQueue 架构虽然与 Kafka 等常见分布式队列类似,但设计上仍有不少独特之处。为了能让对 Kafka 有一定了解的读者更方便地了解 PhxQueue,下面列出了两者的对比。

注:以下对比基于相同的数据可靠性场景:少数派节点失效,不会造成数据丢失,且整体依旧可用。

性能对比

测试环境

CPU: 64 x Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Memory: 64 GB
Network: 10 Gigabit Ethernet
Disk: SSD Raid 10
Cluster Nodes: 3
Ping: 1ms

测试基准及配置

测试结果

开启 Producer Batch:

关闭 Producer Batch:

以上场景,PhxQueue 瓶颈在 cpu,使用率达70% ~ 80%。

小结

  1. PhxQueue 性能与 Kafka 持平;

  2. 相同 QPS 下,由于不用等最慢节点返回,PhxQueue 平均耗时比 Kafka 稍优;

  3. 关闭 Producer Batch 后,在同步刷盘场景下,PhxQueue 性能可达 Kafka 的2倍,原因是,PhxQueue 存储层在写盘前做了 batch,而 Kafka 没有,所以后者会有写放大。

存储层 failover 过程对比

主要对比杀死存储层的一个节点后,对整体吞吐的影响。

Kafka

表现:

  • Failover 期间,在不同阶段程度不同,入队成功率在0% ~ 33%;

  • Failover 持续时间由租约决定,租约时长默认10s。

测试过程:

将 replica.lag.time.max.ms 从 10s 调整为 60s(延长时间方便观察),然后 kill Broker 0,挑选3个 partition,观察 ISR 变化如下:

第一阶段(未 kill Broker 0):
 Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
 Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
 Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2

第二阶段(kill Broker 0 后持续8s):
 Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
 Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
 Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2

第三阶段(持续1分钟左右):
 Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
 Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
 Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0

第四阶段(至此入队成功率完全恢复):
 Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
 Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1
 Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1

其中,第二/三阶段标红处对应的partition入队成功率受损:

  • 第二阶段期间,Partition 96/97/98 均无法写入,入队成功率成功率下降至0%。

  • 第三阶段期间,Partition 96 可继续写入,但 Partition 97/98 无法写入,因为写入要等 Broker 0 回 ack,但 Broker 0 已 kill,入队成功率下降至33%。

而实际观察,第二/三阶段期间完全没吞吐,原因是压测工具不断报连接失败,停止了写入。

压测工具输出:

30551 records sent, 6107.8 records/sec (0.06 MB/sec), 1733.9 ms avg latency, 5042.0 max latency.
30620 records sent, 6117.9 records/sec (0.06 MB/sec), 1771.9 ms avg latency, 5076.0 max latency.
30723 records sent, 6123.8 records/sec (0.06 MB/sec), 1745.4 ms avg latency, 5009.0 max latency.
30716 records sent, 6127.3 records/sec (0.06 MB/sec), 1841.1 ms avg latency, 5299.0 max latency.
30674 records sent, 6133.6 records/sec (0.06 MB/sec), 1621.3 ms avg latency, 4644.0 max latency.
>>> kill Broker 0 here (入队成功率受损)>>>
10580 records sent, 123.4 records/sec (0.00 MB/sec), 1537.1 ms avg latency, 84236.0 max latency.  <<---吞吐下降严重
11362 records sent, 132.3 records/sec (0.00 MB/sec), 1658.3 ms avg latency, 84232.0 max latency.
11367 records sent, 132.3 records/sec (0.00 MB/sec), 1582.4 ms avg latency, 84228.0 max latency.
11236 records sent, 130.9 records/sec (0.00 MB/sec), 1694.2 ms avg latency, 84240.0 max latency.
11406 records sent, 132.8 records/sec (0.00 MB/sec), 1650.5 ms avg latency, 84233.0 max latency.

压测工具连接Broker失败日志:

[2017-08-16 15:38:22,844] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-16 15:38:22,859] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

原因分析:

Kafka Broker leader 是通过 Controller 选举出来的,ISR 列表是 leader 维护的。
前者的的租约是 Controller 定义的,后者的租约是 Broker 配置 replica.lag.time.max.ms 指定的。
所以,第二阶段持续时间较短,是 Controller 的租约时间决定的,第三阶段持续时间较长,是 replica.lag.time.max.ms 决定的。
当 Broker 0 被 kill 时,前者影响本来 Broker 0 是 leader 的 1/3 partitions 的入队成功率,后者影响 Broker 0 作为 follower 的 2/3 partitions 的入队成功率。

PhxQueue

表现:

  • Failover 期间,入队成功率仅下降至66%;

  • Failover 持续时间由租约决定,租约时长默认5s。

  • 开启 换队列重试特性(适合没有绝对顺序性要求的业务提高可用性)后,Failover 期间仍有90+%入队成功率。

测试过程:

将 Store master 租约时长从10s调整为60s(延长时间方便观察),然后kill store 0,观察某 Producer 入队成功率:

关闭换队列重试特性:

>>> kill store 0 here (入队成功率受损)>>>

-------------------------------------------
-- total:  192323
-- time(ms):       10015
-- qps:    19203.49
-- routine_sleep:  73.88%
-- retcode cnt     percent
-- -1      22097   11.49 <<--- 失败:连接失败
-- 0       125905  65.47 <<--- 成功:仍有66%成功率
-- 10102   44321   23.05 <<--- 失败:提示需要重定向到 master
-- usetime(ms)     cnt     percent
-- < 1             0       0.00
-- < 2             0       0.00
-- < 5             610     0.32
-- < 10            7344    3.82
-- < 20            18937   9.85
-- < 50            36067   18.75
-- < 100           6971    3.62
-- < 200           20239   10.52
-- < 500           59059   30.71
-- < 1000          30601   15.91
-- >= 1000         12495   6.50

>>> (入队成功率完全恢复)>>>

-------------------------------------------
-- total:  198955
-- time(ms):       10001
-- qps:    19893.51
-- routine_sleep:  98.00%
-- retcode cnt     percent
-- 0       198955  100.00 <<--- 成功:100%成功率
-- usetime(ms)     cnt     percent
-- < 1             0       0.00
-- < 2             2       0.00
-- < 5             5895    2.96
-- < 10            30830   15.50
-- < 20            65887   33.12
-- < 50            95403   47.95
-- < 100           753     0.38
-- < 200           185     0.09
-- < 500           0       0.00
-- < 1000          0       0.00
-- >= 1000         0       0.00

开启换队列重试特性:

>>> kill store 0 here (入队成功率受损)>>>

-------------------------------------------
-- total:  134752
-- time(ms):       10001
-- qps:    13473.85
-- routine_sleep:  77.43%
-- retcode cnt     percent
-- -202    14      0.01 <<--- 失败:超时
-- -1      2712    2.01 <<--- 失败:连接失败
-- 0       127427  94.56 <<--- 成功:仍有94%成功率
-- 10102   4572    3.39 <<--- 失败:提示需要重定向到 master
-- 10105   27      0.02 <<--- 失败:master 未选举出来
-- usetime(ms)     cnt     percent
-- < 1             0       0.00
-- < 2             4       0.00
-- < 5             3284    2.44
-- < 10            10704   7.94
-- < 20            22109   16.41
-- < 50            32752   24.31
-- < 100           4541    3.37
-- < 200           4331    3.21
-- < 500           11265   8.36
-- < 1000          19706   14.62
-- >= 1000         26056   19.34

>>> (入队成功率完全恢复)>>>

-------------------------------------------
-- total:  198234
-- time(ms):       10014
-- qps:    19795.69
-- routine_sleep:  94.36%
-- retcode cnt     percent
-- 0       198234  100.00 <<--- 成功:100%成功率
-- usetime(ms)     cnt     percent
-- < 1             0       0.00
-- < 2             0       0.00
-- < 5             3875    1.95
-- < 10            22978   11.59
-- < 20            53000   26.74
-- < 50            87575   44.18
-- < 100           6204    3.13
-- < 200           6468    3.26
-- < 500           11963   6.03
-- < 1000          5637    2.84
-- >= 1000         534     0.27

小结

  1. 在存储层 failover 过程中,PhxQueue 和 Kafka 的入队成功率均有一定时长的下降,PhxQueue 的入队成功率在66% ~ 100%,Kafka 的入队成功率在0% ~ 33%;

  2. PhxQueue 开启换队列重试特性后,failover 过程中入队成功率保持在90+%;

  3. PhxQueue 和 Kafka 均能自动切换 master,最终入队成功率完全恢复。

总结

PhxQueue 在存储层做了很多的努力:实现了 master 自动切换,且仍然保证线性一致,切换期间仍然高可用;保证了同步刷盘的吞吐,其性能不亚于异步刷盘;。

另外实现了大部分队列实用特性,例如出入队顺序一致、多订阅、限速、消息重放等,适用于各种业务场景。

目前 PhxQueue 已在微信内部大规模使用,也正式开源。

我们将保持 PhxQueue 开源版本与内部版本的一致,欢迎读者试用并反馈意见。

开源地址:https://github.com/Tencent/phxqueue

我来评几句
登录后评论

已发表评论数()