mysql一主多从以及使用springboot aop进行读写分离的搭建的正确姿式

介绍

这是一篇高度集中火力的生产环境中的mysql5.7一主多从以及结合spring boot进行读写分离的最全文档。主末笔者还会给出一个完整的基于spring boot的使用aop特性做完整项目读写分离的全代码与演示。

本文涉及技术点如下:

  • mysql5.7.30+版本
  • spring boot
  • AOP
  • haproxy
  • keep alive

1.目标

mySQL层1主挂多从

安装3台mysql服务器,三台服务分布如下:

mySQL master: 192.168.2.101

mySQL slaver1: 192.168.2.102

mySQL slaver2: 192.168.2.103

HAProxy层主备

对外以22306端口反向代理2个mySQL的slaver

ha master: 192.168.2.102

ha slaver: 192.168.2.103

Keepalived层主备以及虚出一个ip叫192.168.2.201来供应用层调用

应用层无感知虚ip下挂几台mysql

keepalived master: 192.168.2.102

keepalived slaver: 192.168.2.103

keepalived出来的vip:192.168.2.201

设计spring boot应用程序框架内内置aop切片根据service层方法名决定读写路由

设计一个spring boot,用aop+auto config来实现根据xxx.service.方法名,自动让应用层所有的get/select方法走slaver, add/update/del/insert/delete等方法走master。

该框架内可以实现至少2个基于druid的jdbc数据源,一个叫master, 一个叫slaver。数据库层读写分离需要做到程序员无感。

2.安装三台mySQL服务器

centos7.x(此处我用的是centos7.4)安装mysql5.7过程

第一步,安装mysql5.7 yum源:

wget http://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm
yum localinstall mysql57-community-release-el7-8.noarch.rpm

第二步,验证yum源是否正确:

yum repolist enabled | grep "mysql.*-community.*”

第三步,安装mysql:

yum install mysql-community-server

第四步,设置开机启动

systemctl enable mysqld
systemctl start mysqld

把3台mysql先按照上述方式全部装完,正常启动后

第五步,设置远程root登录

grep 'temporary password' /var/log/mysqld.log

我们可以看到,这边有一个随机密码在:is generated for root@localhost:后,这个是mysql5.7开始在安装后默认生成的一个初始root密码,我们把它复制一下。

然后用它来登录我们刚装好的mysql

mysql -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED BY '111111';

注意:mysql5.7默认安装了密码安全检查插件(validate_password),默认密码检查策略要求密码必须包含:大小写字母、数字和特殊符号,并且长度不能少于8位。否则会提示ERROR 1819 (HY000): Your password does not satisfy the current policy requirements错误,如下图所示:

因为是演练环境,我们可以暂时让我们的mysql5.7的root密码变得简单,因此我们先让这个默认的密码策略变得极其简单

set global validate_password_policy=0;

set global validate_password_length=0;

接下去我们继续添加远程用

ALTER USER 'root'@'localhost' IDENTIFIED BY '111111';

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '111111' WITH GRANT OPTION;

FLUSH PRIVILEGES;

把三台mySQL都按照上述步骤设好了,然后拿mySQL客户端都验证一下可登录性操作。

第六步,设置时间同步

我们把192.168.2.101这台master设置成时间服务器,然后把192.168.2.102以及192.168.2.103设成192.168.2.101的被同步子时间节点。

在192.168.2.101上做如下设置(本次实验环境,因此我们把时间服务器设成ali的时间服务器,地址如下:ntp1.aliyun.co

yum -y install ntp 

vim /etc/ntp.conf

#注释掉所有server *.*.*的指向,新添加一条可连接的ntp服务器  我使用的是阿里云的NTP服务器

server ntp1.aliyun.com ymklinux1

#在其他节点上把ntp指向master服务器地址即可

server 192.168.2.101 ymklinux1

#安装完成后设置ntp开机启动并启动ntp

systemctl enable ntpd

systemctl start ntpd

#查看状态

systemctl status ntpd

注意:各时间子节点设完同步时间服务后要记得运行一下这条命令

ntpdate -d 时间主服务器名或IP

第七步,开始配置三个mysql上的my.cnf文件

它位于/etc/my.cnf文件这个位置。每个mysql里的my.cnf文件使得其内容相同即

# For advice on how to change settings please see

# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html


[mysqld]

#

# Remove leading # and set to the amount of RAM for the most important data

# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.

# innodb_buffer_pool_size = 128M

#

# Remove leading # to turn on a very important data integrity option: logging

# changes to the binary log between backups.

# log_bin

#

# Remove leading # to set options mainly useful for reporting servers.

# The server defaults are faster for transactions and fast SELECTs.

# Adjust sizes as needed, experiment to find the optimal values.

# join_buffer_size = 128M

# sort_buffer_size = 2M

# read_rnd_buffer_size = 2M

datadir=/var/lib/mysql

socket=/var/lib/mysql/mysql.sock


# Disabling symbolic-links is recommended to prevent assorted security risks

symbolic-links=0


log-error=/var/log/mysqld.log

pid-file=/var/run/mysqld/mysqld.pid

port=3306

character-set-server=utf8

innodb_buffer_pool_size=2G

max_connections = 800

max_allowed_packet = 128M

max_heap_table_size = 256M

tmp_table_size = 256M

innodb_buffer_pool_chunk_size = 256M

innodb_buffer_pool_instances = 8

innodb_thread_concurrency = 4

#(核心交易系统设置为1,默认为1,其他2或者0)

innodb_flush_log_at_trx_commit=2

核心参数讲解:

  • character-set-server=utf8 字符集设成UTF8
  • innodb_buffer_pool_size=2G mysql核心innodb引擎参数之缓冲池,这个值一般可以为OS的70%,它和oracle的share_pool_size功能一样,设至大小将直接影响你的查询效率,因为是演示环境,因此不用太大,够用就行。
  • max_connections = 800 数据库最大连接数
  • max_allowed_packet = 128M 一次查询返回的结果集可允许的最大厦size
  • max_heap_table_size = 256M  先说下tmp_table_size吧:它规定了内部内存临时表的最大值,每个线程都要分配。(实际起限制作用的是tmp_table_size和max_heap_table_size的最小值。)如果内存临时表超出了限制,MySQL就会自动地把它转化为基于磁盘的MyISAM表,存储在指定的tmpdir目录下,默认:优化查询语句的时候,要避免使用临时表,如果实在避免不了的话,要保证这些临时表是存在内存中的。如果需要的话并且你有很多group by语句,并且你有很多内存,增大tmp_table_size(和max_heap_table_size)的值。这个变量不适用与用户创建的内存表(memory table).
  • tmp_table_size = 256M
  • innodb_buffer_pool_chunk_size = 128M 此处我们使用默认配置
  • innodb_buffer_pool_instances = 8 CPU数量
  • innodb_thread_concurrency =8 不要超过CPU核数
  • innodb_flush_log_at_trx_commit=2 #(核心交易系统设置为1,默认为1,其他2或者0),

    0代表:log buffer将每秒一次地写入log file中,并且log file的flush(刷到磁盘)操作同时进行。该模式下在事务提交的时候,不会主动触发写入磁盘的操作。

    1代表:每次事务提交时MySQL都会把log buffer的数据写入log file,并且flush(刷到磁盘)中去,该模式为系统默认(因此会保留每一份redo日志)

    2代表:每次事务提交时MySQL都会把log buffer的数据写入log file,但是flush(刷到磁盘)操作并不会同时进行。该模式下,MySQL会每秒执行一次 flush(刷到磁盘)操作。该模式速度较快,也比0安全,只有在操作系统崩溃或者系统断电的情况下,上一秒钟所有事务数据才可能丢失。

3个mySQL都配置完后,记得全部重启Linux,下面我们要进入mySQL的1主多从的搭建了。

mySQL主从搭建

Master配置

修改my.cnf,在最后一行加入如下几行。

此处,演示环境我们对ecom这个schema做主从同步

#主从同步配置
server-id=1
log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=ecom
validate_password=off

注意:

  1. ID一定要为阿拉伯数字,血泪教训;
  2. 另外注意一点,这边为了演练环境方便,我把密码策略都给设成“傻瓜式”了。真式生产环境一定要小心;

Slaver1-192.168.2.102中追加的配置

此处我们对slaver开启了read_only=1,即在从上就不允许发生写操作。

erver-id=2
log-bin=/var/lib/mysql/mysql-bin
relay-log-index=slave-relay-bin.index
relay-log=slave-relay-bin
replicate-do-db=ecom
log-slave-updates
slave-skip-errors=all
read_only=1
validate_password=off

Slaver2-192.168.2.103中追加的配置

server-id=3
log-bin=/var/lib/mysql/mysql-bin
relay-log-index=slave-relay-bin.index
relay-log=slave-relay-bin
replicate-do-db=ecom
log-slave-updates
slave-skip-errors=all
read_only=1
validate_password=off

创建用于复制主从数据的mysql用户

2个Slaver设完后记得重启mysql,记下来我们开始在master上创建一个mySQL用户,它用于同步主从用的。

set global validate_password_policy=0;
set global validate_password_length=0;
create user 'repl' identified by '111111';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'192.168.2.%' identified by '111111';
flush privileges;

使用master的binlog给到Slaver用于“追平”master上的数据

在master所在的centos7上,输入以下命令,把ecom这个schema全量导出。

mysqldump -u ecom -p 111111 -B -F -R -x  ecom|gzip > /opt/mysql_backup/ecom_$(date +%Y%m%d_%H%M%S).sql.gz

然后把它copy到两个slaver上-192.168.2.102和192.168.2.103上

接下来要做的事都是相同的

gzip -d ymkmysql_20200411_215500.sql.gz
mysql -u root -p < ymkmysql_20200411_215500.sql

在两个slaver上追平主库后,开始进入真正的数据库级别的主从同步了。

先跑去master节点上,通过sqlclient

show master status;

记下这2个值来,然后我们跑到192.168.2.102上以及192.168.2.103上都做同样的一件事

stop slave;
reset slave;
change master to master_host='192.168.2.101',master_port=3306,master_user='repl',master_password='111111',master_log_file='mysql-bin.000013',master_log_pos=154;

它的作用在于,把slaver上的binlog同步位置调成和master一致,调完后

start slave;
show slave status;

数据量小的情况下,几分钟就可以看到"slave_io_running"与"slave_sql_running"的状态为Yes了,这代表主从已经完成同步。

主从做完后我们来做个测试

测试1,写发生在主,2从分别可以读到刚才在主上发生的写

在主上我们插入一条记录(order_id自增长)

现在我们连上从1-192.168.2.102

再连上从2-192.168.2.103

完成1主2从的同步,下面我们进入haproxy的搭建

Haproxy的2机热备搭建

安装和配置haproxy

为了演练,我们把haproxy分别安装在和mysql2个slaver一起的主机上,真实生产要记得专门为haproxy安排2台额外的单独主机

yum -y install haproxy

在mysql slaver上创建一个无权限用于haproxy与mysql间心跳检测用的mysql用户

create user 'haproxy' identified by '';

因为前面我们为了演练方便,因此把每个mysql的password策略给禁了,如果是在生产环境这是不可能的,也就是说此处的 identified by '一定有密码',要不然以上这个sql是过不了的。那么没关系,创建完后我们用set password或者用mysql图形化客户端把这个密码给改成“空”吧,因为是无权限用户,因此一点不用担心。

Haproxy的log不会记录在磁盘上,便于我们今后监控和查看Haproxy的日志,我们这边需要把Haproxy的日志和centos的rsyslog作一个关联。

cd /var/log
mkdir haproxy
cd haproxy
touch haproxy.log
chmod a+w haproxy.log
vim /etc/rsyslog.cnf --修改rsyslog.cfg文件

把原文件中的以下两行放开:

# $ModLoad imudp

#$UDPServerRun 514 

并增加以下3行

$ModLoad imudp 
$UDPServerRun 514
#新增的行
local7.*                                                /var/log/boot.log
# Save haproxy log
#新增的行 
local2.*                       /var/log/haproxy/haproxy.log

加完后完整的样子是这样的

还没完,继续修改/etc/sysconfig/rsyslog

vim /etc/sysconfig/rsyslog

SYSLOGD_OPTIONS="" 改为 SYSLOGD_OPTIONS="-r -m 2 -c 2"

改完后重启rsyslog

systemctl restart rsyslog

开始编辑/etc/haproxy/haproxy.cfg文件

haproxy1-192.168.2.102上的配置

把文件内容整个替换成这样吧

global
    # to have these messages end up in /var/log/haproxy.log you will
    # need to:
    #
    # 1) configure syslog to accept network log events.  This is done
    #    by adding the '-r' option to the SYSLOGD_OPTIONS in
    #    /etc/sysconfig/syslog
    #
    # 2) configure local2 events to go to the /var/log/haproxy.log
    #   file. A line like the following can be added to
    #   /etc/sysconfig/syslog
    #
    #    local2.*                       /var/log/haproxy.log
    #
    log         127.0.0.1 local2
 
    chroot      /var/lib/haproxy
    pidfile     /var/run/haproxy.pid
    maxconn     4000
    user        haproxy
    group       haproxy
    daemon
 
    # turn on stats unix socket
    stats socket /var/lib/haproxy/stats
 
#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
    mode                    http
    log                     global
    option                  httplog
    option                  dontlognull
    option http-server-close
    option forwardfor       except 127.0.0.0/8
    option                  redispatch
    retries                 3
    timeout http-request    10s
    timeout queue           1m
    timeout connect         10s
    timeout client          5m
    timeout server          1m
    timeout http-keep-alive 10s
    timeout check           10s
    maxconn                 2000
 
#---------------------------------------------------------------------
# main frontend which proxys to the backends
#---------------------------------------------------------------------
## 定义一个监控页面,监听在1080端口,并启用了验证机制
listen stats
        mode http
        bind 0.0.0.0:1080
        stats enable
        stats hide-version
        stats uri /dbs
        stats realm Haproxy\ Statistics
        stats auth admin:admin
        stats admin if TRUE
 
listen  proxy-mysql 0.0.0.0:23306
        mode tcp
        balance roundrobin
        option tcplog
        option mysql-check user haproxy #在mysql中创建无任何权限用户haproxy,且无密码
        server ymkMySqlSlaver1 192.168.2.102:3306 check weight 1 maxconn 300
        server ymkMySqlSlaver2 192.168.2.103:3306 check weight 1 maxconn 300
        option tcpka

在此,我们做了这么几件事:

  1. 开启了一个1080的haproxy自带监控界面,它的地址为 http://192.168.2.102:1080/dbs
  2. 使用23306端口来代理mysql端口对外暴露给应用用,如:spring boot的jdbc pool的应用
  3. 使用haproxy自带的mysql-check user mysql上用于监听监控的用户此处就是haproxy来探测mysql的可用性

2个haproxy上都做同样的配置,接下去在192.168.2.102和192.168.2.103都按照这个顺序来重启rsyslog与haproxy。

systemctl restart rsyslog
systemctl restart haproxy

接下去,我们可以使用http://192.168.2.103:1080/dbs或者是192.168.2.102:1080,用户名密码都为admin来监控haproxy了。

并且我们可以用192.168.2.102:23306以及192.168.2.103:23306,直接通过haproxy同时连上2台mysql的slavers。此时,在应用通过haproxy连上2个slavers时,只有一个mysql“挂”了,应用是无感知的,它会在毫秒内被飘移到另一个可用的mysql slaver上,我们来做实验。

我们通过192.168.2.102:23306连上2个mysql的slaver

查询执行成功,然后我们来杀掉任意一台mysql。。。嗯。。。192.168.2.102,就杀了你吧,嗯,对,就是你!!!

好,我被杀了!

再次回到应用连接处,依旧执行一次查询

老婆,出来看上帝。。。看,查询依旧可以被执行,说明,Haproxy已经帮我们把应用连接自动飘到了192.168.2.103上去了。

再做个测试,我们来杀haproxy,杀192.168.2.103上的haproxy

看,应用连接依然有效,说明192.168.2.102上的haproxy起作用了。

接下去就是keepalived的布署了,激动人心的时候就要到来鸟!

布署keepAlived集群

安装KeepAlived

在每台slaver上安装keepalived,因为是演练环境,因此我们把keepalived也装在和haproxy一起的vm上,如果是生产环境一定记得要为keepalived布署单独的server,至少2台。

yum instlal keepalived

简单的不能再简单了。

配置keepalived

注意了!每台keepalived的配置是不同的,这里分主次关系,这里不像keepalived是load roubin的概念,这可是有优先级的概念哦。

对/etc/keepalived/keepalived.conf文件进行编辑。

192.168.2.102上的keepalived-lb01

global_defs {
 
   router_id LB01
 
}
 
vrrp_script chk_haproxy
 
{
 
     script "/etc/keepalived/scripts/haproxy_check.sh"
 
     interval 2
 
     timeout 2
 
     fall 3
 
}
 
vrrp_instance haproxy {
 
    state MASTER
 
    #interface eth0
     interface enp0s3
    virtual_router_id 1
 
    priority  100  
 
    authentication {       
 
            auth_type PASS       
 
            auth_pass password   
 
    } 
  unicast_peer {
    192.168.2.102
    192.168.2.103
  }
    virtual_ipaddress {
 
           192.168.2.201
 
    }
 
    track_script {
 
         chk_haproxy
 
    }
 
    notify_master "/etc/keepalived/scripts/haproxy_master.sh"
 
}

此处我们做了如下几件事:

  1. 设定了192.168.2.102为 master状态
  2. 名字为:LB01
  3. 优先级为100,当然,另一台keepalived的话一定优先级比他低,一定记得,从机的priority小于主机的priority
  4. 定义了一个虚拟ip,它叫192.168.2.201,有了这个ip后,一切应用都通过这个ip来访问mysql的slavers集群了
  5. 把该虚ip绑定到“interface enp0s3”网卡上,这块网卡是我在192.168.2.102上的千兆网络接口的系统接口名

192.168.2.103上的keepalived-lb02

此处,有2个地方和lb01的配置有不同:

  1. router_id
  2. priority
global_defs {
 
   router_id LB02
 
}
 
vrrp_script chk_haproxy
 
{
 
     script "/etc/keepalived/scripts/haproxy_check.sh"
 
     interval 2
 
     timeout 2
 
     fall 3
 
}
 
vrrp_instance haproxy {
 
    state BACKUP
 
    #interface eth0
     interface enp0s3
 
    virtual_router_id 1
 
    priority  50  
 
    authentication {       
 
          auth_type PASS       
 
          auth_pass password   
 
    }       
unicast_peer {
    192.168.2.102
    192.168.2.103
}
    virtual_ipaddress {
 
          192.168.2.201
 
    }
 
    track_script {
 
         chk_haproxy
 
    }
 
    notify_master "/etc/keepalived/scripts/haproxy_master.sh"
 
}

下面给出haproxy_check.sh和haproxy_master.sh文件的内容

haproxy_check.sh-每个keepalived的机器上都要放

#!/bin/bash
 
LOGFILE="/var/log/keepalived-haproxy-state.log"
 
 date >>$LOGFILE
 
 if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
 
     echo "fail: check_haproxy status" >>$LOGFILE
 
     exit 1
 
 else
 
     echo "success: check_haproxy status" >>$LOGFILE
 
     exit 0
 
fi

haproxy_master.sh-每个keepalived的机器上都要放

#!/bin/bash
LOGFILE="/var/log/keepalived-haproxy-state.log"
echo "Being Master ..." >> $LOGFILE

测试keepalived功能

先把两台机器上的keepalived启动起来

测试1:通过VIP:192.168.2.201:23306连入mysql

成功!

测试2:杀192.168.2.102上的mysql进程后并通过vip查询

飘移成功!!!

测试3:杀192.168.2.102上的haproxy进程后并通过vip查询

成功,毫无压力!!!Come on, again, I wanna more!

测试4:杀192.168.2.103上的haproxy,对mysql master作写,并在2从上做读

以观看keepalived对haproxy是否真正起到了热备

结论

keepalived+haproxy+mysql的1主多从的热备搭建成功!

下面我们来攻克另一个课题,那就是,如何在springboot应用中,不要让我们一群开发每个人记得“主写-从读”并在编码上自己去实现,我们需要在整体代码框架层统一去做“拦截”,以实现自动根据service层的方法名来做到 “主写-从读”。

代码层实现主写-从读的自动拦截封装解决方案

框架一览

pom.xml中需要额外加入的内容

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jdbc</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
</dependency>

applicatin.properties

此处我们定义了2个数据源,一个叫master,一个叫slaver,而slaver的地址我们即没指向192.168.2.102也没指向192.168.2.103,而是指向了keepalived暴露的那个虚IP192.168.2.201:23306

logging.config=classpath:log4j2.xml
 
#master db
mysql.datasource.master.type=com.alibaba.druid.pool.DruidDataSource
mysql.datasource.master.driverClassName=com.mysql.jdbc.Driver
mysql.datasource.master.url=jdbc:mysql://192.168.2.101:3306/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false
mysql.datasource.master.username=ecom
mysql.datasource.master.password=111111
mysql.datasource.master.initialSize=50
mysql.datasource.master.minIdle=50
mysql.datasource.master.maxActive=100
mysql.datasource.master.maxWait=60000
mysql.datasource.master.timeBetweenEvictionRunsMillis=60000
mysql.datasource.master.minEvictableIdleTimeMillis=120000
mysql.datasource.master.validationQuery=SELECT'x'
mysql.datasource.master.testWhileIdle=true
mysql.datasource.master.testOnBorrow=false
mysql.datasource.master.testOnReturn=false
mysql.datasource.master.poolPreparedStatements=true
mysql.datasource.master.maxPoolPreparedStatementPerConnectionSize=20
#slaver db
mysql.datasource.slaver1.type=com.alibaba.druid.pool.DruidDataSource
mysql.datasource.slaver1.driverClassName=com.mysql.jdbc.Driver
mysql.datasource.slaver1.url=jdbc:mysql://192.168.2.201:23306/ecom?useUnicode=true&characterEncoding=utf-8&useSSL=false
mysql.datasource.slaver1.username=ecom
mysql.datasource.slaver1.password=111111
mysql.datasource.slaver1.initialSize=50
mysql.datasource.slaver1.minIdle=50
mysql.datasource.slaver1.maxActive=100
mysql.datasource.slaver1.maxWait=60000
mysql.datasource.slaver1.timeBetweenEvictionRunsMillis=60000
mysql.datasource.slaver1.minEvictableIdleTimeMillis=120000
mysql.datasource.slaver1.validationQuery=SELECT'x'
mysql.datasource.slaver1.testWhileIdle=true
mysql.datasource.slaver1.testOnBorrow=false
mysql.datasource.slaver1.testOnReturn=false
mysql.datasource.slaver1.poolPreparedStatements=true
mysql.datasource.slaver1.maxPoolPreparedStatementPerConnectionSize=20

启文件MultiDSDemo.java

package org.sky.retail.demo;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.transaction.annotation.EnableTransactionManagement;
 
@ServletComponentScan
@EnableAutoConfiguration(exclude = { DataSourceAutoConfiguration.class })
@ComponentScan(basePackages = { "org.sky.retail.demo" })
@EnableTransactionManagement
public class MultiDSDemo {
 
    public static void main(String[] args) {
        SpringApplication.run(MultiDSDemo.class, args);
    }
 
}

自动装配用MultiDSConfig.java

package org.sky.retail.demo.config;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.sql.DataSource;
 
import org.sky.retail.demo.util.db.DBTypeEnum;
import org.sky.retail.demo.util.db.MyRoutingDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 
import com.alibaba.druid.pool.DruidDataSource;
 
@Configuration
public class MultiDSConfig {
 
    @Bean
    @ConfigurationProperties(prefix = "mysql.datasource.master")
    public DataSource masterDataSource() {
        return new DruidDataSource();
    }
 
    @Bean
    @ConfigurationProperties(prefix = "mysql.datasource.slaver1")
    public DataSource slave1DataSource() {
        return new DruidDataSource();
    }
 
    @Bean
    public DataSource myRoutingDataSource(@Qualifier("masterDataSource") DataSource masterDataSource,
            @Qualifier("slave1DataSource") DataSource slave1DataSource) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DBTypeEnum.MASTER, masterDataSource);
        targetDataSources.put(DBTypeEnum.SLAVE1, slave1DataSource);
        // targetDataSources.put(DBTypeEnum.SLAVE2, slave2DataSource);
        MyRoutingDataSource myRoutingDataSource = new MyRoutingDataSource();
        myRoutingDataSource.setDefaultTargetDataSource(masterDataSource);
        myRoutingDataSource.setTargetDataSources(targetDataSources);
        return myRoutingDataSource;
    }
 
    @Bean
    public JdbcTemplate dataSource(DataSource myRoutingDataSource) {
        return new JdbcTemplate(myRoutingDataSource);
    }
 
    @Bean
    public DataSourceTransactionManager txManager(DataSource myRoutingDataSource) {
        return new DataSourceTransactionManager(myRoutingDataSource);
    }
}

用于对service层方 法做自动切面实现“主写从读”的DataSourceAop.java

package org.sky.retail.demo.aop;
 
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.sky.retail.demo.util.db.DBContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
 
import javax.servlet.http.HttpServletRequest;
 
@Aspect
@Component
public class DataSourceAop {
    protected Logger logger = LogManager.getLogger(this.getClass());
 
    /**
     * 定义切入点,切入点为org.sky.retail.demo.service下的所有函数
     */
    @Pointcut("!@annotation(org.sky.retail.demo.util.db.Master) "
            + "&& (execution(* org.sky.retail.demo.service..*.select*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.get*(..)))")
    public void readPointcut() {
 
    }
 
    @Pointcut("@annotation(org.sky.retail.demo.util.db.Master) "
            + "|| execution(* org.sky.retail.demo.service..*.insert*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.add*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.update*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.edit*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.delete*(..)) "
            + "|| execution(* org.sky.retail.demo.service..*.remove*(..))")
    public void writePointcut() {
 
    }
 
    @Before("readPointcut()")
    public void read(JoinPoint joinPoint) {
 
        // 接收到请求,记录请求内容
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        DBContextHolder.slave();
    }
 
    @Before("writePointcut()")
    public void write() {
        DBContextHolder.master();
    }

DBTypeEnum.java

package org.sky.retail.demo.util.db;
 
public enum DBTypeEnum {
 
    MASTER, SLAVE1;
 
}

Master.java

package org.sky.retail.demo.util.db;
 
public @interface Master {
 
}

用于实现多数据源路由的MyRoutingDataSource.java

package org.sky.retail.demo.util.db;
 
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
 
/**
 * Created by mk on 2018/4/3 - to replace ODY's jdbc package.
 */
public class MyRoutingDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        return DBContextHolder.get();
    }
}
MyRoutingDataSource.java

DBContextHolder.java

package org.sky.retail.demo.util.db;
 
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicInteger;
 
public class DBContextHolder {
    protected static Logger logger = LogManager.getLogger(DBContextHolder.class);
    private static final ThreadLocal<DBTypeEnum> contextHolder = new ThreadLocal<>();
    private static final AtomicInteger counter = new AtomicInteger(0);
 
    public static void set(DBTypeEnum dbType) {
        contextHolder.set(dbType);
    }
 
    public static DBTypeEnum get() {
        return contextHolder.get();
    }
 
    public static void master() {
        set(DBTypeEnum.MASTER);
        logger.info(">>>>>>切换到master");
    }
 
    public static void slave() {
        set(DBTypeEnum.SLAVE1);
        logger.info(">>>>>>切换到slave");
        // 轮询用来实现应用程序随机飘移,下面这段代码如果说AZURE的PAAS上不能使用haproxy来飘那就只能使用原子类型取模来随机飘移多slaver了
//      int index = counter.getAndIncrement() % 2;
//      logger.info("counter.getAndIncrement() % 2======" + index);
//      if (counter.get() > 9999) {
//          counter.set(-1);
//      }
//      if (index == 0) {
//          set(DBTypeEnum.SLAVE1);
//          logger.info(">>>>>>切换到slave1");
//      } else {
//          set(DBTypeEnum.SLAVE1);// todo SLAVE2
//          logger.info(">>>>>>切换到slave2");
//      }
    }
 
}

Service类-OrderService.java

package org.sky.retail.demo.service;
 
import org.sky.retail.demo.vo.Order;
 
public interface OrderService {
 
    public void insertOrder(Order order) throws Exception;
 
    public Order getOrderByPK(int orderId) throws Exception;
}

测试用-OrderController.java

package org.sky.retail.demo.controller;
 
import java.util.HashMap;
import java.util.Map;
 
import javax.annotation.Resource;
 
import org.sky.platform.retail.controller.BaseController;
import org.sky.retail.demo.service.OrderService;
import org.sky.retail.demo.vo.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
 
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.ApiResponse;
 
@RestController
@RequestMapping("/demo/order")
public class OrderController extends BaseController {
 
    @Resource
    private OrderService orderService;
 
    @ApiOperation(value = "创建一条订单", notes = "传入一个订单信息以增加一条订单")
    @ApiResponses(value = { @ApiResponse(code = 200, message = "创建订单成功"),
            @ApiResponse(code = 403, message = "创建订单失败,不能创建空的订单对象"),
            @ApiResponse(code = 417, message = "创建订单失败, 因为某个系统错误") })
    @RequestMapping(value = "/addEmployee", method = RequestMethod.POST)
    public ResponseEntity<String> addOrder(@RequestBody String params) throws Exception {
        ResponseEntity<String> response = null;
        String returnResultStr;
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        Map<String, Object> result = new HashMap<>();
        try {
 
            JSONObject requestJsonObj = JSON.parseObject(params);
            if (requestJsonObj != null) {
                Order order = getOrderFromJson(requestJsonObj);
                logger.info(">>>>>>addOrder");
                orderService.insertOrder(order);
                result.put("code", HttpStatus.OK.value());
                result.put("message", "insert a new order successfully");
                result.put("orderId", order.getOrderId());
                result.put("goodsId", order.getGoodsId());
                result.put("amount", order.getAmount());
                returnResultStr = JSON.toJSONString(result);
                response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.OK);
            } else {
                result.put("code", HttpStatus.FORBIDDEN.value());
                result.put("message", "can not add a empty order");
            }
        } catch (Exception e) {
            logger.error("addOrder error: " + e.getMessage(), e);
            result.put("message", "add order error error: " + e.getMessage());
            returnResultStr = JSON.toJSONString(result);
            response = new ResponseEntity<>(returnResultStr, headers, HttpStatus.EXPECTATION_FAILED);
        }
        return response;
    }
 
    @ApiOperation(value = "通过订单ID获取订单信息接口", notes = "传入订单ID获取信息")
    @ApiImplicitParam(name = "orderId", value = "订单ID", paramType = "query", defaultValue = "", required = true)
    @RequestMapping(value = "/getOrderById", method = RequestMethod.GET)
    public Order getOrderById(int orderId) throws Exception {
        Order order = new Order();
        try {
            order = orderService.getOrderByPK(orderId);
        } catch (Exception e) {
            logger.error("getOrderById orderId->" + orderId + " error: " + e.getMessage(), e);
        }
        return order;
    }
 
    private Order getOrderFromJson(JSONObject requestObj) {
        // int orderId = requestObj.getInteger("orderId");
        int goodsId = requestObj.getInteger("goodsId");
        int amount = requestObj.getInteger("amount");
        Order order = new Order();
        // order.setOrderId(orderId);
        order.setGoodsId(goodsId);
        order.setAmount(amount);
        return order;
    }
}

测试

测试1-写Go Master读Go Slaver

读操作测试

为了测试可视化,我有意在DbContextHolder处做了2个日志输出。

测试2-杀192.168.2.102上的mysql来进行读

无任何压力,毫秒级飘移!

测试3-杀192.168.2.103上的keeyalived来进行读-这个有点猛

猛也要杀啊,程序员没啥难的,JAVA从入门到删库跑路,PYTHON零基础到进精神病院!

毫无压力,看到没!

测试4-别测了你,再测用jmeter去吧,打完收工!

想收工?可能吗?附一张生产级别haproxy+keepalived的布署图

一定记得:

  • haproxy实际生产要和mysql分开布。
  • haproxy+keepalived可以在一个vm上。

前面如果是在腾讯或者是ali云用lvs做vip地址转换,如果是azure就用ILB把vip转向应用层。

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章