RabbitMQ学习笔记之(一) 基本概念介绍

基本介绍

RabbitMQ , 是一个使用 erlang 编写的 AMQP(高级消息队列协议) 的服务实现. 简单来说, 就是一个功能强大的消息队列服务.流程上来说,是发消息者(producer)把消息放到队列(queue)中去,然后收消息者(consumer)从队列中取出消息. RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在 发消息者队列 之间, 加入了 交换器 (Exchange) . 这样 发消息者队列 就没有直接联系, 转而变成 发消息者 把消息给 交换器 , 交换器 根据调度策略再把消息再给 队列

rabbitmq 中几个比价重要的概念如下:

rabbitmq

消息队列运转过程:

使用示例

下面介绍下 python 如何使用 rabbitmq ,这里假定你已经有了 rabbitmq 的环境并且已经配置好了,下面只介绍如何使用。 生产者:

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
# 从连接上获取channel
channel = connection.channel()
# 定义名为testexchange的交换机类型是fanout,交换机支持持久化
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

# 定义名为hello的队列,设置其支持持久化
channel.queue_declare(queue='hello', durable=True)
# 将hello队列绑定到我们定义的testexchange交换机上
channel.queue_bind(exchange='testexchange', queue='hello')
for i in range(10):
    channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

消费者:

import pika
from time import sleep

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()

channel.queue_declare(queue='hello', durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    sleep(1)

channel.basic_consume(
    queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

这里在消费者和生产者都定义了同样的队列,这样做是因为你不知道消费者还是生产者哪个会先启动起来。 我们这里为交换机,队列,消息都设置了 durable=True 使其支持持久化,这样在当rabbitmq异常退出之后,你的消息不至于丢失。

交换机类型

参考在上面的生产者中定义交换机的代码:

channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

其中 exchange_type='fanout' 就是设置交换机的类型, RabbitMQ 常用的交换器类型有 fanoutdirecttopicheaders 这四种,下面分别介绍下。

fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。 参考如下代码:

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='fanout', durable=True)

channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)

channel.queue_bind(exchange='testexchange', queue='hello')
channel.queue_bind(exchange='testexchange', queue='hello1')

for i in range(10):
    channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

运行结果如下(这里我都没有启动consumer): 这里可以看到, hellohello1 队列中都有消息进入,而 hello2 没有,因为他没有绑定。

direct

direct类型的交换器路由规则也很简单,它会把消息路由到那些RoutingKey完全匹配的队列中。 参考如下代码:

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)

channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello1')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')

for i in range(10):
    channel.basic_publish(exchange='testexchange', routing_key='hello', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))
connection.close()

运行结果如下: 这里可以看到, hellohello3 队列中都有消息进入,而 hello2 没有,因为他绑定的 routing_key 不是 hello

topic

topic类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到routing_key相匹配的队列中。routing_key中可以存在两种特殊字符串 *# ,用于做模糊匹配,其中 * 用于匹配一个单词, # 用于匹配多规格单词(可以是零个)。 参考如下代码:

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='topic', durable=True)

channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)

channel.queue_bind(exchange='testexchange', queue='hello', routing_key='hello_1.*.*')
channel.queue_bind(exchange='testexchange', queue='hello1', routing_key='hello_1.#')
channel.queue_bind(exchange='testexchange', queue='hello2', routing_key='hello')

for i in range(10):
    channel.basic_publish(exchange='testexchange', routing_key='hello_1', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))

    channel.basic_publish(exchange='testexchange', routing_key='hello_1.a.b', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                          ))
connection.close()

运行结果如下: 我们发出了两条消息, hello_1 只会被 hello_1.# 匹配到,而 hello_1.a.b 会被两个都匹配到。

headers

headers类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在队列与交换器绑定时, 会设定一组键值对规则, 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列.

import pika

user_pwd = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost', credentials=user_pwd))
channel = connection.channel()
channel.exchange_declare(exchange='testexchange', exchange_type='headers', durable=True)

channel.queue_declare(queue='hello', durable=True)
channel.queue_declare(queue='hello1', durable=True)
channel.queue_declare(queue='hello2', durable=True)

channel.queue_bind(exchange='testexchange', queue='hello', arguments={'a': '1'})
channel.queue_bind(exchange='testexchange', queue='hello1', arguments={'b': '2', 'c': '3', 'x-match': 'all'})
channel.queue_bind(exchange='testexchange', queue='hello2', arguments={'a': '1', 'b': '4', 'c': '5', 'x-match': 'any'})

for i in range(10):
    channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                              headers={'a': '1'}
                          ))
    print(" [x] Sent 'Hello World!{}'".format(i))

    channel.basic_publish(exchange='testexchange', routing_key='', body='Hello World!{}'.format(i),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 消息持久化
                              headers={'a': '1', 'b': '2'}
                          ))
connection.close()

运行结果如下: 另外还有消费者的确认机制,我们下篇文章介绍。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章