上海久耶基于 HBase 实时数仓探索实践

本文根据上海久耶大数据研发工程师武基鹏在中国 HBase 技术社区第四届 MeetUp 上海站中分享的《基于 HBase 实时数仓探索实践》编辑整理而成。

文章从六个方面介绍,首先是久耶第一代离线数仓以及第二代实时数仓。接下来介绍下公司业务场景和业务开发,基于 HBase 的开发流程,然后介绍下公司 CDH 集群,介绍下 CDH 集群调优监控。最后分享两个生产案例。

第一代离线数仓是在去年三月份上线,主要是基于 OMS 和 WMS,由于分库分表,大约有十几个库。前期通过 SQOOP 进行数据抽取,后来由于 SQOOP 的一些问题采用了阿里开源的 DataX,时间粒度使用脚本调度实现 T+1 模式抽取到 Hive。作分析时采用 Apache 的 Kylin,将数据直接存入 HBase 中,最初数仓建立用于数表查询,用的是 Superset,后面也用了 Saiku。六个月就被淘汰,因为离线数仓有个问题,业务部分要查询数据,但是你的时间粒度是昨天的,业务部分需要看当天一段时间内的数据,离线是无法满足。

接下来在另一个集群构建实时数仓,其选型主要有两个,第一个是实时,实时采集利用 Maxwell,直接采集公司数据库 MySQL,将数据直接以 json 格式发送到 kafka,数仓存储选型是 HBase。

为什么选择 Maxwell 呢,第一个它能够使用“select * from Table”进行 bootstrapping 初始化数据,在大数据构建时可以利用 Maxwell 进行全表扫描,这句 SQL 会自动触发 Maxwell 某个线程进行数据拉取。第二个 Maxwell 支持断点还原功能,大数据平台架构不光考虑到高可靠、高性能,也要保证数据零丢失,它支持记录 MySQL 的 post 日志进行数据还原,这是当初选择最重要的原因。第三个 Maxwell 将数据从 MySQL 发送到 Kafka,Kafka 是分区的,如何保证全局有序是个问题。它能保证这个特性,支持 database, table, primary key, or column 的拼接,将数据发送到某个分区;比如一条业务数据在业务系统先在 insert 再做 update 再做 delete,Kafka 会将这三者发送到三个分区,key 值为空不会记录,在销毁时用 sparkstreaming 可能会以 delete、update、insert 顺序,会造成数据紊乱。我们希望将这些特征数据发送到 Kafka 一个分区,而 Kafka 单分区是有序的。第四个 Maxwell 也会将这些数据发送到后端,当业务数据的表需要升级,如加索引、加字段,可以通过 alert 语句解析捕获,进行同步更新到 HBase 中。因此基于这四点要求选择了 Maxwell,没有选择当前其他开源产品。

接下来讲一下为什么选择 HBase 而不选择 pudu 等产品。第一个是 HBase 是分布式、可缩的。第二个是随机的读和写,第三个 HBase 支持百万列。第三个介绍下为什么要选择 Phoenix,首先原因是支持 SQL,利用原生 HBase 进行查询、代码分析比较吃力。第二个我们构建的表是盐表,能够解决热点问题,避免一个节点很繁忙另一个节点很闲。第三 Phoenix 支持二级索引,由于表是盐表(分区),索引也是分区的。第四个支持 Spark,可以直接将表传入 Phoenix 而不用通过 HBase,有利于传统开发人员转型,而不用专注于底层 HBase。

基于 CDH HBase 版本构建 Phoenix 版本历程,phoenix-for-cloudera-4.9、HBase-1.2、cdh5.9,这个存在问题,然后采用 apache-phoenix-4.11.0、HBase-1.2,最后采用 phoenix-for-cloudera-4.10、HBase-1.2、cdh5.12。cdh5.11 的邮件配置存在 bug。

进行编译的原因是去年 Phoenix 官方是不支持 CDH 版本,目前是支持的。编译时将 pom 文件,改为 CDH 支持,然后改生产需要的 Spark 版本。修复 SYSTEM.MUTEX 表在分布式的计算时,多次创建错误。QueryServicesOptions.java 文件修改参数 DEFAULT_IS_NAMESPACE_MAPPING_ENABLED=true。Phoenix 存在一个问题就是时区,比如一条上午十点的业务数据在 Phoenix 周转下,时间数据会减一个 8 小时。修改 DateUtil.java 文件 timezone 为”Asia/Shanghai”,但是读写两种只解决了一种,而业务代码开发需要经过 Phoenix 架构 JDBC,数据还是会出错,上面只解决了查询,后来采用下面改动,然后编译。

上图是实时数仓架构图,主要的存储层还是以 HBase 为主。第一层业务系统数据库在阿里云平台上,有 OMS、WMS,Report DB 是 OMS 和 WMS 的重复,将里面的数据全部同步于一台机器,使用的就是 Maxwell,其支持白名单和黑名单。业务平台的表可能有两三百个,大数据平台的计算可能只需要 100 多个,可以添加白名单,有些表的数据就可以不用过来了。这些数据通过 Json 发送到 Kafka,然后通过 Spark streaming 去消费 Kafka,通过 JDBC 写入 HBase。表不是通过 Phoenix 语句创建,不关心底层 HBase,只需要通过 Phoenix 像 MYSQL 一样查询即可。同时会将计算结果存储到 Redis,久耶慧策应用也会将数据写入 ES 里面。中间一层就是常见应用开发,如 Spark Streaming、Spark SQl,也用 Python 和 R 语言。调度平台起先用的是 Azkaban,然后是 Airflow,最后用的是 Oozie。上图蓝色是实时大屏,红色是全球仓库,大约有四十几个,数据绑定用的是 saiKU,将 Phoenix 架包集成进去,saiKU 分上卷和下卷,业务人员依据自己的需求去拿行和列数据,saiKU 通过 Phoenix 组装 SQL 语句查询结果数据。也用到 zeppelin,这是 Spark 交互式开发必须用到的。

接下来讲一下数据仓库,首先是模型建设,第一层是基础表,在 Phoenix 中建立与 MySQL 一样的表。在基础表的基础上构建事实表(订单实时发生的表)和维度表(如中国有多少省多少市等更新不是很大的表),依据事实表和维度表进行代码开发,构建领域表,就是依据业务需求得出的结果存到领域表。数据校验是通过数据量比对,起先是在重库时做触发器,但是 MySQL 重库触发器支持不友好。通过改造 Phoenix 代码将数据写入 Redis,增加加一删除减一,MySQL 数据和 HBase 数据是一天一查一对比,当不相等直接调用 shell 脚本进行全表扫描。当前只采用 OMS、WMS 的库,QPS 处于 2000,1 条数据: 平均 60 列 495b。

业务场景开始是业务报表开发,有客诉妥投、ABC 订单、商业季度等。也提供一个 BI 自助分析,第三个就是双十一大屏和龙虎榜,同时使用了 BMS 系统,是一个商业结算系统。第五个是今年做的领导层和客户层的慧策,商业决策分析。

业务开发套路就是依据业务需求将数据存在那些表里面,需要将构建表的语句提取出来构建 Phoenix Table,然后 Kafka+Spark Streaming+Phoenix 进行数据的插入。接着就是 Spark 开发读和写,我们还利用了 DBeaver。我们建表使用了联合组件,由于公司集群规模不是很高,regionServer 是 38 台,COMPRESSION 是使用 SNAPPY,这是依据压缩比、解压性能。

接下来是一个经典开发案例 Kafka+Spark Streaming+Phoenix,Phoenix 可以理解为 MySQL 架包的 JDBC。我们并没有使用 Phoenix 的 Pool 池,官方也推荐使用正常 JDBC 文件,因为 JDBC 已经支持长连接,foreachPartition 拿到 Phoenix 的 JDBC,中间进行常见数据处理,Kafka 接收过来数据是 Json 格式,如何将其转化为 Phoenix 的 upset 语法和 delete 语法,完成后就将连接关闭。

数据流入 Phoenix 大数据平台是通过 bootstream 的全表扫描,其增量数据也是实时进入。业务代码开发首先将架包导入 pom 文件,如何找维度是将 Phoenix 的 Apache 下载到 IDEA,在测试类里面查找。Phoenix+Spark 读取有好几种,选择以上写法原因有:首先其支持列裁剪,第二支持 where 条件,configuration 指的是 Spark 的 HDFS 的 conf。

业务开发是多张表,Spark 表是 df,接下来就和 Phoenix 和 HBase 无关。接下来就是对接 Spark 业务开发逻辑处理,最后结果集会回写到 HBase 中。还是通过 Phoenix 写入,有追加、overwrite。HBase 没有很好地可视化工具,利用 DBeaver,支持 MYSQL、Oracle 等所有数据库类型,也支持二次开发借助于接口实现。

接下来介绍下集群调优参数,分为六个方面:(1)Linux parameters、(2)HDFS parameters、(3)HBase parameters、(4)GC parameters、(5)Monitor、(6)Bug。句柄数、文件数、线程数这些都是要调,因为 regionserver 在操作时需要 open file,处理时需要用到一些线程,一些系统都是架设在 Linux 上,因此集群调优都需要调它。需要注意的是改完后需要检查是否生效,立即生效是 sysctt-p。Spark 开发需要将数据频繁的写入 HBase 中,HBase 底层是 HDFS,在写入时就会出现问题,最后发现 Linux 系统参数没有调。

在正常的 HBase 节点机器上,swap 是设置为 0,这并不是禁用 swap 而是其惰性是最大的。由于我们公司由于业务系统较多,吃的内存比较紧,因此设为 10,这样可以使 job 慢一点但是不能挂,但是如果做实时就需要设置为 0。这个最终设置取决于你们自身业务环境,选择自己需要的就好。如果做 CBH 的平台部署必须要关闭大页面。

接下来分享一个有意思的参数 HDFS Parameters,正常调优是 CBH 界面打开、HBase 的 xml 文件打开。主要调优是 timeout 和 handler 参数,将其几倍放大,socket.timeout 在 HBase 的 xml 文件一定要部署,否则无法支持高并发操作。

当一个本机线程无法创建一个本机线程,这段代码打在 HDFS 的 dataload,当时 dataload 的内存配置是 8G,实际只使用 1G,这个时候就休要加上 echo “kernel.threads-max=196605”->/etc/sysctl.conf,echo"kernel.pid_max=196605"->/etc/sysctl.conf,echo “vm.max_map_count=393210”-> /etc/sysctl.conf 三个参数,这其实是底层 Linux 抛出的错误。提醒一点 socket.timeout 参数不仅在 HDFS 中需要配置,在 HBase 中也需要配置。

GC 是 regionserver 配置,但是配置是 CDH 配置,GC 默认垃圾选择器是 CMS,需要将其改为 GE,如果需要配置可以去尝试下,小米以前分享过。可以对参数进行调试进行压错调优,尤其大数据平台开发尤其如此。

项目上线需要做监控,第一个就是 HBase 的读和写,绿色是写,但是读存在两个波峰,因为我们的调度平台以一个小时将所有 job 调度完。图中 Y 轴是每秒的请求量,如果写的量上来了或者读的波峰没有规律,就有可能是集群宕了。

第二个监控的指标是 FDS,就是 Regionserver 的文件句柄数,如果请求很多,句柄数会很高,因为其底层依赖于 Linux,如果超过 Linux 设置值机器容易夯住下线,导致 CPU 不正常,这时需要后台强制机器下线。然后需要监控 Zookeeper,监控的是 Zookeeper Open Connections,因为 HBase 进行操作需要打开的连接,当业务场景为长服务,如 Spark streaming 一直运行,先前尝试用 SparkSQL+Phoenix 做一个长服务,因为调度都是通过 shell 脚本调度,在资源紧张时需要抢资源,在 submit 时需要申请资源(大约 30S),线上是不允许的。最后采用 Spark streaming+Spark SQL+Phoenix JDBC,Spark streaming 是实时的每隔一小时判断进行数据处理,这个时候 Zookeeper Open Connections 就随着递增趋势上涨,当到 Connection 数(默认 500)CDH 会杀掉。后来改为水平,利用 PHOENIX-4319:Zookeeper connection should be closed immediately 解决问题。

接下来讲一下 Kafka 如何做监控,其实只需要上面一幅图,上图绿色指标读,Received 是蓝色线,相当于生产者写到 Kafka 里面,绿色是 Spark streaming 进行消费,相当于 Fetched。这幅图相当于实时同步架构,消息没有做积压。但是为什么波峰会比它高,原因是数据通过 Maxwell 发送到 Kafka 时是一个 Json 数据,但是 Kafka 消费时需要额外加一些东西(来自哪个 topic、offset 是什么等),如果两条线没问题就是没出问题。

Bug 方面,PHOENIX-4056:java.lang.IllegalArgumentException: Can not create a Path from an empty string,先前有问题采用降版本,目前已经解决,方案在社区里有。SPARK-22968:java.lang.IllegalStateException: No current assignment for partition kssh-2,这个是 Sparkstreaming 读 Kafka 时抛出的错误,这个在 Spark2.4.0 有新的补丁。

接下来分享两个案例,分为两种,一种是 3 次 RIT,园区断电机器挂掉出现 RIT。HBase 有个 WAL,数据基本不会丢,只需要将机器重启。重启过程会有一些 RIT 操作,如果 regionserver 挂了申请维护时间,尝试重启 regionserver 节点,如果不行重启 HBase 集群,这个时候需要看 HBase 的 master 的 active 的 log 日志。还有一次是高并发内存不够用,regionserver 挂掉,重启后在 CDH 的 HBase 运行正常,但是在监控页面 HBase 还是异常,这时候只需要将 CMS 的 serviceMonitor 重启就 OK。第三次 RIT 事故 regionserver 挂掉,尝试使用 HBCK 命令修复问题还是很多。最后通过日志分析发现 Hlog 有问题,通过 HDFS 命令将文件移到某个地方,重启就 OK 了。丢失的数据通过 Maxwell 恢复,预估事故发生点通过全表扫描进行恢复。

接下来分享一个三支烟的故事,数据来源于阿里云,自建机房需要通过 VPN 将数据拉倒本地机房。双十一所有仓库都在运作,MySQL 机器扛不住导致延迟比较大,延迟约半个小时。需要在 T2 将数据完全恢复,解决方案直接将 Maxwell 架设到阿里云进行实时同步,数据进行全表扫描,只需要扫描大屏显示需要的数据,将 T1 到 T2 的数据进行 SparkSQL,将计算结果写到 redis 里面,Sparkstreaming 进行现场改,只判断 T2 流进的数据才会将 T2 的基础值进行累积计算,实时 Job 跑了 15 分钟数据就实时过来了。

作者介绍

武基鹏,上海久耶供应链管理有限公司大数据研发工程师。主要从事大数据平台产品的技术工作;负责设计、构建和优化基于 HDFS/HBase 的存储平台架构;负责提升 Hadoop/HBase 等集群的高可用性、高性能、高扩展特性;负责基于 Spark 开发及性能调优。

本文来自武基鹏在 DataFun 社区的演讲,由 DataFun 编辑整理。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章