RabbitMQ学习笔记之(二) 消费端的确认与拒绝

首先,我们先看下这样的业务场景,在消息发出后, Consumer 接收到了生产者所发出的消息,但在 Consumer 突然出错崩溃,或者异常退出了,但是生产者消息已经发出来了,那么这个消息可能就会丢失,为了解决这样的问题, RabbitMQ 引入了 ack 机制。 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时, RabbitMQ 会⾃自动把发送出去的消息置为确认, 然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。 当采用ack消息确认机制后,只要将 autoAck 设置为 false 。消费者就可以有足够的时间来处理消息,而不用担心消费过程中突然异常退出导致消息丢失的情况,因为 RabbitMQ 会一直持有消息,直到消费者调用 basic.ack 为止。在这种情况下,对于 RabbitMQ 来说,队列中的消息就可以分为两部分,一部分是等待发送给消费者的消息,另外一部分就是等待接收消费者确认的消息。那么如果在这个时候,消费者突然发生中断,在消费中的消息会怎么处理呢? 如果 RabbitMQ 一直没有接收到消费者的确认消息,并且消费者的连接已经关闭,那么 RabbitMQ 就会重新将让消息进入队列中,等待下一个消费者消费。 RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。 参考如下代码,我们设置了 auto_ack=True

import pika
from time import sleep

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sleep(1)

channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

可以看到,如果我们设置 auto_ack=True 之后,虽然我们在消费的时候有休眠1s,而且这个时候消息还没有全部消费完,但是在后台看到队列中的消息已经被消费完了,这个原因是当消费者连接上队列了,因为没有指定消费者一次获取消息的条数,所以队列把队列中的所有消息一下子推送到消费者端,当消费者订阅的该队列,消息就会从队列推到客户端,当消息从队列被推出的时的那一刻就表示已经对消息进行自动确认了,消息就会从队列中删除。 下面我们再看看设置 auto_ack=False 的情况:

import pika
from time import sleep

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sleep(1)

channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

运行结果如下: 可以看到,队列中的消息都变成了 unacked 状态,这是为什么呢? 我们上面有说过, rabbitmq 需要等到消费者显示的调用 basic.ack ,要不然的话 rabbitmq 会一直持有这些消息,如果我们在这个时候再启动一个消费者的话,可以看到这些消息还会再次被消费。为了解决这个问题,我们只要修改下 callback 方法如下:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sleep(1)
    # delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字.
    ch.basic_ack(delivery_tag=method.delivery_tag)

就可以,这时,我们重启消费者,可以看到我们的消息会被正常的消费,并且队列中消息的不会被瞬间清空,而是按照我们的消费速度一个一个的删除。 消费者在接收到消息之后,还可以拒绝消息,我们只需要调用 basic_reject 就可以,如下:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sleep(1)
    # delivery_tag 是在 channel 中的一个消息计数, 每次消息提取行为都对应一个数字.
    ch.basic_reject(delivery_tag=method.delivery_tag, requeue=False)

requeue 参数的意思是被拒绝的这个消息是否需要重新进入队列,默认是 True

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章