rabbitmq客户端自动重连

编程rookie, 如有错误请指出 ☞: 253065903@qq.com

RabbitMQ Node.js 客户端( AMQP 0-9-1 V0.5.2 )自动重连

重启策略

开始找解决方案:

  1. 通过查看AMQP的源码,发现没有reconnect的选项

  2. 然后上GitHub上看有没有人提出类似的问题 github repo ,通过输入 reconnect 搜索issue区找到题为 Support Auto-reconnectionissue ,发现这个问题是早在 2013 年就提出来的,可是现在还是 open

    的状态!

  3. 在这个Issue区发现有一个解决方案还是可以实践一下的:

    function connectRMQ() {
      amqp.connect(config.rabbitmq.URI).then(function(conn) {
        conn.on('close', function() {
          console.error('Lost connection to RMQ.  Reconnecting in 60 seconds...');
          return setTimeout(connectRMQ, 60 * 1000);
        });
        return conn.createChannel().then(function(ch) {
            var ok = ch.assertQueue(config.rabbitmq.queue, {durable: true});
            ok = ok.then(function() {
                ch.prefetch(1);
                ch.consume(config.rabbitmq.queue, doWork, {noAck: false});
            });
            return ok.then(function() {
                console.log(" [*] Waiting in %s.", config.rabbitmq.queue);
            });
    
            function doWork(msg) {
                var body = msg.content.toString();
                console.log(" [x] Received %s", body);
                setTimeout(function() {
                    ch.ack(msg);
                }, config.rabbitmq.timeout);
            }
        });
      }).then(null, function() {
         setTimeout(connectRMQ, 10 * 1000);
         return console.log('connection failed');
      });
    }
    
    connectRMQ();
    

上述的解决方案是在建立连接之后对连接添加 close 的监听事件,当 close 事件触发,

或者连接出错以及之后的步骤出错都在10s之后重新调用方法本身实现重连

  1. 还有一种简单粗暴的方法,监听 closeerror 事件, 有错误就抛出来,然后依靠外部的守护程序将此客户端重启

实践

按照Issue区发现的解决方案进行实践,修改之前的代码

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      reconnect(e)
    })
}

function reconnect(e) {
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
        connect()
    }, 10 * 1000)
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}

然后进行测试:

通过对MQserver的重启,均正常,然后将MQserver的机器的网断掉测试,发现了close事件并没有监听到,而是报了heartbeat超时的错误,从而程序直接退出了,于是又在代码中加入对error事件的监听:

amqp
  .connect(MQ_CONFIG.url)
  .then(conn => {
    conn.on('error', e => {
      reconnect(e)
    })
    conn.on('close', e => {
      reconnect(e)
    })
    return conn
  })

这下应该不会导致程序退出了吧,然而又引入了新的问题,当重启MQserver时,报了 ECONNECTRET 的错误,两个监听事件都监听到了,所以程序重连了两次,导致一个项目在MQserver上建立了两个连接,当再一次重启MQserver时,建立了四个连接!

这是很严重的错误,然而并不是所有时候两个监听事件都能监听到,比如 heartbeat 超时就只报 error 的错误,所有需要想出一个策略,让程序始终与MQserver之间只有一个连接。

采用声明一个变量,记录是不是正在连接

var isConnecting = false

如果已经在连接了,其他的重连都不做处理

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}

连接上时将重连的标志设为 false

.then(conn => {
  cnt = 0
  log('connect RMQ OK')
  isConnecting = false

于是乎,完整代码如下:

#!/usr/bin/env node

const MQ_CONFIG = require('./conf/rabbitmq')
const REDIS_CONFIG = require('./conf/redis')
const Utils = require('./lib/Utils')
const pubRedisCli = Utils.connectRedis(REDIS_CONFIG.url)
var amqp = require('amqplib')
var ex = MQ_CONFIG.ex
var patten = MQ_CONFIG.routing_key
var exType = MQ_CONFIG.ex_type
var q = MQ_CONFIG.q || 'signals'
var cnt = 0
var isConnecting = false

function connect() {
  amqp
    .connect(MQ_CONFIG.url)
    .then(conn => {
      conn.on('error', (e) => {
        reconnect(e)
      })
      conn.on('close', e => {
        reconnect(e)
      })
      return conn
    })
    .then(conn => {
      cnt = 0
      log('connect RMQ OK')
      isConnecting = false
      console.log(' [*] Waiting for signals. To exit press CTRL+C')
      return conn.createChannel()
    })
    .then(ch => {
      return ch.assertQueue(q, {
        durable: true
      })
        .then(() => {
          return ch.assertExchange(ex, exType, {
            durable: true
          })
        })
        .then(() => {
          return ch.assertQueue(q, {
            durable: true
          })
        })
        .then(() => {
          return ch.bindQueue(q, ex, patten)
        })
        .then(() => {
          return ch.consume(q, (msg) => {
            pubRedisCli.publish(msg.fields.routingKey, msg.content.toString(), function (err, reply) {
              if (err) {
                log(err)
                ch.nack(msg)
              } else {
                if (reply !== 0) {
                  ch.ack(msg)
                } else {
                  ch.ack(msg)
                  saveUnSubscribeMsg(msg.content.toString())
                }
              }
            })
          })
        })
    })
    .catch(e => {
      isConnecting = false
      reconnect(e)
    })
}

function reconnect(e) {
  if (!isConnecting) {
    isConnecting = true
    log(e.message)
    log('lost RMQ connection')
    cnt++
    log(`正在第${cnt}次重新连接RMQ...`)
    setTimeout(() => {
      connect()
    }, 10 * 1000)
  }
}
connect()
/**
 * if signals didn't be subscribed, they would be saved to ./data dir
 * @param {string} msg
 */
function saveUnSubscribeMsg(msg) {
  let date = new Date().toLocaleDateString()
  const fs = require('fs')
  const dir = './data'
  if (!fs.existsSync(dir)) {
    fs.mkdirSync(dir)
  }
  let path = `${dir}/${date}.txt`
  let isExist = fs.existsSync(path)
  if (isExist) {
    fs.appendFileSync(path, msg)
  } else {
    fs.writeFileSync(path, msg)
  }
}

function log(...args) {
  console.log(...args, new Date().toLocaleString())
}
我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章