RabbitMQ实战:居然有这么多骚操作

RabbitMQ的Java客户端统一使用 com.rabbitmq.client 作为顶级包名。其中,最核心的类主要有:ConnectionFactory、Connection、Channel、Consumer、DefaultConsumer、BasicProperties。需要说明的是,本文不只是教你RabbitMQ客户端的基本玩法,还有一些你可能不知道的一些骚操作。

连接RabbitMQ

使用RabbitMQ第一步当然是连接RabbitMQ,这里就不说怎么搭建RabbitMQ环境了,本文假设你已经有RabbitMQ环境,连接RabbitMQ的代码如下:

ConnectionFactory factory = new ConnectionFactory();

factory.setUsername("root");

factory.setPassword("root123");

factory.setVirtualHost("/");

factory.setHost("127.0.0.1");

factory.setPort(5672);

Connection conn = factory.newConnection();

需要说明的是,如果你用的是默认vhost,即/。那么factory.setVirtualHost("/")这行代码可以省掉。那么,这里有一个 有趣 的问题:创建RabbitMQ连接最短的代码是怎样的?答案是只需要两行代码即可。这是为什么呢?因为创建连接的这几个字段都有默认值,用户名密码默认值默认为guest/guest,host和端口默认为localhost和5672(ConnectionFactory.java源码中有DEFAULT_开头命名的常量,就是默认值)。不过需要注意的是默认账户guest只能连接本地RabbitMQ环境:

ConnectionFactory factory = new ConnectionFactory();

Connection conn = factory.newConnection();

创建RabbitMQ连接还有另一种通过URI的方式,代码如下:

ConnectionFactory factory = new ConnectionFactory();

factory.setUri("amqp://username:password@hostName:port/virtualHost");

Connection conn = factory.newConnection();

每成功创建连接,在RabbitMQ服务端都有相应的日志:

2020-05-10 17:51:58.380 [info] <0.7312.0> accepting AMQP connection <0.7312.0> ([::1]:61390 -> [::1]:5672)

2020-05-10 17:51:58.509 [info] <0.7312.0> connection <0.7312.0> ([::1]:61390 -> [::1]:5672): user 'guest' authenticated and granted access to vhost '/'

被动申明

如下这段代码所示,被动申明一个队列。它只检查队列是否存在,如果存在,那么不会有任何操作,并且返回和主动且成功创建队列一样的响应信息。如果队列不存在,那么就会抛出Channel级别的异常。所以,被动申明一般使用在一次性临时性Channel申明的地方:

Queue.DeclareOk response = channel.queueDeclarePassive("queue-name");

// returns the number of messages in Ready state in the queue

response.getMessageCount();

// returns the number of consumers the queue has

response.getConsumerCount();

如果队列不存在时,抛出的异常信息如下:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'queue-none' in vhost '/', class-id=50, method-id=10)

线程安全

强制要求一个线程一个Channel,不要多个线程共用一个Channel实例,否则会出现一些莫名其妙的错误。

消费者(Push模式)

消费者消费消息一般通过channel.basicConsume方法,这个方法有很多重载参数,不过我们常用的方法是下面这两个。官方更加推荐第一个带有consumerTag的方法,并且每个不同的消费者实例要有不同的consumerTag。强烈不建议一个连接上有相同的consumerTag,否则可能会导致 automatic connection recovery 的问题,参考(https://www.rabbitmq.com/api-guide.html#connection-recovery):

String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

consumerTag是消费者唯一标识符。如果是使用了不带consumerTag参数的方法,那么RabbitMQ会自动生成一个唯一Tag,这样的Tag没有业务参考意义。如下图所示:

如果我们自定义了consumerTag的值,那么,一看某个队列的消费者信息,就知道这些消费者来自哪里、是干嘛的,非常让人容易理解。如下图所示:

需要说明的是,这种Push模式,如果生产者产生的消息量超过消费者能承受的量,就会撑爆消费者。不过,RabbitMQ考虑到了这一点,可以通过方法 channel.basicQos (1000)进行限流。basic.qos是针对Channel进行设置的,也就是说只有在channel建立之后才能发送basic.qos命令。在rabbitmq的实现中,每个channel都对应会有一个rabbit_limiter进程,当收到basic.qos命令后,在rabbit_limiter进程中记录信令中prefetch_count的值,同时记录的还有该channel未ack的消息个数,从而保证未ack的消息数量不超过prefetch_count的值(如果prefetch_count设置为0,表示没有任何限制)。

消费者(Pull模式)

Push模式对应的消费端方法是basicConsumer(),而Pull模式对应的消费端方法是basicGet()。每次获取一条消息,不能批量。这种方法效率非常低下,因为不知道队列中是否有消息,所以必须反复询问,即使大部分请求没有结果的情况下,这种方法 非常不推荐使用 。代码如下:

boolean autoAck = false;

GetResponse response = channel.basicGet(queueName, autoAck);

if (response == null) {

// No message retrieved.

} else {

AMQP.BasicProperties props = response.getProps();

byte[] body = response.getBody();

long deliveryTag = response.getEnvelope().getDeliveryTag();

// do something

channel.basicAck(method.deliveryTag, false);

}

高级连接方式

消费者线程默认被一个ExecutorService自动分配,当连接被关闭的时候,默认的ExecutorService会调用shutdown()。但是,如果创建连接的时候使用了用户自定义ExecutorService,必须手动调用shutdown()方法,否则,线程池中的线程可能会阻止JVM终止,除非kill -9。使用自定义线程池代码如下所示:

ExecutorService es = Executors.newFixedThreadPool(20);

Connection conn = factory.newConnection(es);

需要说明的是,这个特性只有当你明确知道在消费者callback碰到处理瓶颈的情况下才考虑使用,如果没有消费者callback,或者非常少量,那么默认的线程池完全足矣。

使用地址集合

在通过factory构造Connection的时候,允许配置多个地址集合,代码如下所示。它会首先尝试连接host1:post1,如果连接失败,会再尝试连接host2:post2,而且整个过程对用户无感知,只要有一个地址是可用的,就不会抛出任何异常:

Address[] addr = new Address[]{ new Address(host1, port1),

new Address(host2, port2)};

Connection conn = factory.newConnection(addr);

支持NIO

RabbitMQ的Java客户端从4.0开始支持Java NIO。NIO的目的不是为了比BIO更快,它只是为了方便用户更轻易的控制资源,比如线程等。默认的BIO模式下,每一个Connection连接都会用一个线程从网络Socket中读取数据。而在NIO模式下,我们是可以控制与网络Socket交互的线程数。

如果你的Java进程中使用了几十甚至上百个Connection,那么可以尝试使用NIO模式,因为它相比默认的BIO模式,可以节省很多的线程资源。并且在线程数设置合理的情况下,性能不会有任何衰减。开启NIO模式非常简单,如下所示:

ConnectionFactory factory = new ConnectionFactory();

// 默认nio模式线程数为1

factory.useNio();


另一种使用NIO的方式:

factory.setNioParams(new NioParams().setNbIoThreads(4));

网络故障自动恢复

在RabbitMQ服务器和Java客户端之间的网络故障是很常见的现象,RabbitMQ的Java客户端是支持自动恢复的,并且4.0以后该特性是默认开启的,证据在ConnectionFactory的源码中:

private boolean automaticRecovery = true;

private boolean topologyRecovery = true;

topology是什么意思?中文是拓扑的意思。在这里是指交换机、队列、绑定关系、消费者等。

我们也可以在new出来ConnectionFactory的时候,显示设置开启or关闭。如果恢复失败,RabbitMQ会固定时间间隔以后进行重试,默认为5秒钟(DEFAULT_NETWORK_RECOVERY_INTERVAL)。可以通过方法setNetworkRecoveryInterval()指定间隔时间。如果构造Connection时用的是地址集合,那么地址会被随机打乱,然后一个接一个进行重试:

ConnectionFactory factory = new ConnectionFactory();

factory.setAutomaticRecoveryEnabled(true);

factory.setNetworkRecoveryInterval(10000);

那么故障恢复在什么时候触发呢?主要是如下这些情况,只要任意一个条件发生都会触发:

  1. Connection上抛出IO异常、或者其他一些其他非预期的异常;

  2. scoket读取超时;

  3. 失去心跳;

如果是应用启动过程中初始化连接碰到RabbitMQ节点故障,这种情况下自动连接恢复是不会介入的。因为这种情况下,很可能RabbitMQ有一些故障或者问题,开发人员有责任排查问题原因。另外,如果显示调用connection.close()方法后,恢复机制也不会介入。

心跳机制

创建ConnectionFactory时,设置一个大于0的值就是开启心跳机制。如果设置等于0的值,就是关闭心跳机制:

ConnectionFactory cf = new ConnectionFactory();

// set the heartbeat timeout to 60 seconds

cf.setRequestedHeartbeat(60);

需要说明的是,如果设置心跳超时值太低的话,可能会由于一些原因比如瞬时网络故障等导致误报。这里给出一些经验数据:值低于5秒的话,很可能造成误报。值低于1秒的话,基本上都是误报。值在5~20秒之间对大部分环境来说,都是一个比较理想的值。如果是一个很大的值,例如1800秒,这时候心跳信息传送的少了,几乎没有实际的影响,就相当于关闭了心跳机制。

END

如果读完觉得有收获的话,欢迎点【好看】,关注【阿飞的博客】,查阅更多精彩历史!!!

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章