基于 DataX 数据库基础表数据同步

万标

2015年加入去哪儿技术团队。目前在金融事业部/支付中心,测试工程师岗位,对技术有浓厚兴趣。

一、 问题及背景

在此之前,新增的基础数据在上线后,是通过手工在各测试环境数据库执行 sql 语句的方式来维护的。另外,测试环境基础数据由于测试场景需要被修改,而修改后很有可能不会被恢复到原始值,这样的话可能会导致他人得到的测试结果与期望不一致,需要花不少时间定位原因然后再手动还原配置数据。很显然这种维护方法缺陷效率低下。

测试环境的维护是测试工作中提高测试效率很重要的一个环节。我们的测试环境大概经历这么三个阶段,从最初的维护 N 套环境的配置,到一键部署只需要维护一套配置,再到 Noah (Noah 介绍移步 http://mp.weixin.qq.com/s/Xit7Ohnb4Lhs16FqzNhiGA)整个环境创建、服务部署和基础数据同步全流程的服务,环境的维护成本逐步降低。测试环境基础数据同步是在一键部署阶段诞生的。 一键部署解决了多套环境配置信息的集中管理、数据库安装和服务部署等问题,但是对于数据库表结构和基础数据的维护没有很好的解决。当时支付中心跟 dba 合作解决了一键部署环境表结构同步的问题,那么剩余的基础表数据同步问题,我们使用一个最简单的方案来解决,利用阿里开源的 DataX 来进行数据库表数据的同步,在此基础上增加一个配置任务管理界面。

二 、解决方案

2.1 D ataX 介绍

DataX 是一个开源数据同步框架,相对其他数据同步工具具有以下特点:

1)异构数据同步

DataX 将复杂的网状的同步链路变成了星型数据链路, DataX 作为中间传输载体负责连接各种数据源。

2)易扩展

DataX 将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件的设计让开发者极易扩展。

3)同步效率高

DataX3.0 每一种读插件都有一种或多种切分策略,都能将作业合理切分成多个 Task 并行执行,单机多线程执行模型可以让 DataX 速度随并发成线性增长。

4)可靠性高

支持所有的强数据类型,每一种插件都有自己的数据类型转换策略,可以保证数据完整无损的传输到目的端。提供供作业全链路的流量、数据量运行时监控,包括作业本身状态、数据流量、数据速度、执行进度等信息。以及实现脏数据精确过滤、识别、采集、展示,提供多种的脏数据处理模式。

5)速度控制

提供了包括通道(并发)、记录流、字节流三种流控模式,可以随意控制作业速度,使得作业在库可以承受的范围内达到最佳的同步速度。

6)容错机制

DataX 作业时有可能受外部因素的干扰,网络闪断、数据源不稳定等因素很容易导致同步作业被中断, DataX 提供多种重试策略。 更多介绍详见 https://github.com/alibaba/DataX ,下图是 DataX 官方提供的框架图:

2.2 方案实现

基于 DataX 的特性,我们很容易实现数据库的基础表数据的同步。开发一个基础数据任务管理系统(QDataManager)可以添加同步数据的任务,以及调用 DataX 执行同步数据任务。这样我们只需要维护一个基准库,然后利用基础数据任务管理系统定时或手工操作将基础表数据同步到各个测试环境的数据库中。

基础数据任务管理系统(QDataManager)包含添加任务,编辑任务,查询任务,手动执行和定时执行任务等功能。基础数据任务管理系统主页面如下图:

基础数据同步管理系统(QDataManager)添加任务时生成 DataX 所需的配置文件,同时将任务信息保存到同步任务数据库中,每一个任务对应某一环境某张表。 基础数据同步管理系统(QDataManager)结构图如下:

举个例子添加一个同步 Z 环境的账务库科目表的任务, 生成的配置信息 subject_info.json 文件信息如下:

{
	"job": {
		"content": [{
			"reader": {
				"name": "mysqlreader",
				"parameter": {
					"column": ["*"],
					"connection": [{
						"jdbcUrl": ["jdbc:mysql://ip:port/pay_accounting"],
						"table": ["subject_info"]
					}],
					"password": "passw",
					"username": "user",
					"where": ""
				}
			},
			"writer": {
				"name": "mysqlwriter",
				"parameter": {
					"column": ["*"],
					"connection": [{
						"jdbcUrl": "jdbc:mysql://ip:port/pay_accounting",
						"table": ["subject_info"]
					}],
					"password": "passw",
					"preSql": ["truncate table subject_info"],
					"session": ["set session sql_mode='ANSI'"],
					"username": "user",
					"writeMode": "insert"
				}
			}
		}],
		"setting": {
			"speed": {
				"channel": "1"
			}
		}
	}
}

相关参数说明:

1. reader 节点,基础库的相关配置信息

column:所配置的表中需要同步的列名集合,使用 JSON 的数组描述字段信息。用户使用*代表默认使用所有列配置;

jdbcUrl:基础数据库的 JDBC 连接信息;

table:指定需要同步的表;

where:筛选条件, MysqlReader 根据指定的 column 、table 、 where 条件拼接 SQL ,并根据这个 SQL 进行数据抽取。利用 where 条件可以进行数据增量同步。如果不填写 where 语句,包括不提供 where 的 key 或者 value ,将同步全量数据;

2. writer 节点,目标库的相关配置信息

table:指定待同步的表名称,必须保证基础库表结构一致;

session:在获取 Mysql 连接时,执行 session 指定的 SQL 语句,修改当前 connection session 属性;

preSql:写入数据到目的表前,会先执行这里的 sql 语句;

writeMode:控制写入数据到目标表采用 insertinto 或者 replaceinto 语句;

三、使用情况

目前在支付中心每天定时同步测试环境基础数据,从手动同步基础数据解放出来,也避免了由于基础数据被修改排查问题耗费掉宝贵的测试时间,从而提升测试工作效率。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章