有关ReabbitMQ的基础知识

(一)基本概念

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。我曾经对这门语言挺有兴趣,学过一段时间,后来没坚持。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。

RabbitMQ的结构图如下:

几个概念说明:

Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

(二)应用实际

我使用Linux服务器(ubuntu 9.10 64位),安装RabbitMQ非常方便。

先运行如下命令安装erlang:

apt-get install erlang-nox

再到rabbitmq.com上下载RabbitMQ的安装包,如下安装:

dpkg -i rabbitmq-server_2.6.1-1_all.deb

安装完后,使用

/etc/init.d/rabbitmq-server start|stop|restart

来启动、停止、重启rabbitmq。

在正式应用之前,我们先在RabbitMQ里创建一个vhost,加一个用户,并设置该用户的权限。

使用rabbitmqctl客户端工具,在根目录下创建”/pyhtest”这个vhost:

rabbitmqctl add_vhost /pyhtest

创建一个用户名”pyh”,设置密码”pyh1234″:

rabbitmqctl add_user pyh pyh1234

设置pyh用户对/pyhtest这个vhost拥有全部权限:

rabbitmqctl set_permissions -p /pyhtest pyh “.*” “.*” “.*”

后面三个”*”代表pyh用户拥有对/pyhtest的配置、写、读全部权限

设置好后,开始编程,我用Perl写一个消息投递程序(producer):

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;
use UUID::Tiny;

my $channel = 1000; # channel ID,可以随意指定,只要不冲突
my $queuename = “pyh_queue”; # 队列名
my $exchange = “pyh_exchange”; # 交换机名
my $routing_key = “test”; # routing key

my $mq = Net::RabbitMQ->new(); # 创建一个RabbitMQ对象

$mq->connect(“localhost”, { vhost => “/pyhtest”, user => “pyh”, password => “pyh1234″ }); # 建立连接
$mq->channel_open($channel); # 打开一个channel
$mq->exchange_declare($channel, $exchange, {durable => 1}); # 声明一个持久化的交换机
$mq->queue_declare($channel, $queuename, {durable => 1}); # 声明一个持久化的队列
$mq->queue_bind($channel, $queuename, $exchange, $routing_key); # 使用routing key在交换机和队列间建立绑定

for (my $i=0;$i<10000000;$i++) { # 循环1000万次
my $string = create_UUID_as_string(UUID_V1); # 产生一条UUID作为消息主体
$mq->publish($channel, $routing_key, $string, { exchange => $exchange }, { delivery_mode => 2 }); # 将消息结合key以持久化模式投递到交换机
}

$mq->disconnect(); # 断开连接

消息接受程序(consumer)大概如下:

#!/usr/bin/perl
use strict;
use Net::RabbitMQ;

my $channel = 1001;
my $queuename = “pyh_queue”;
my $mq = Net::RabbitMQ->new();

$mq->connect(“localhost”, { vhost=>”/pyhtest”, user => “pyh”, password => “pyh1234″ });
$mq->channel_open($channel);

while (1) {
my $hashref = $mq->get($channel, $queuename);
last unless defined $hashref;
print $hashref->{message_count}, “: “, $hashref->{body},”\n”;
}

$mq->disconnect();

consumer连接后只要指定队列就可获取到消息。

上述程序共投递1000万条消息,每条消息36字节(UUID),打开持久化,共耗时17分多钟(包括产生UUID的时间),每秒投递消息约9500条。测试机器是8G内存、8核志强CPU。

投递完后,在/var/lib/rabbitmq/mnesia/rabbit@${hostname}/msg_store_persistent目录,产生2G多的持久化消息数据。在运行consumer程序后,这些数据都会消失,因为消息已经被消费了。



简介


   由于某些原因,今天接触了一下一个新的东西RabbitMQ(  http://www.rabbitmq.com/ )总的来说给人的感觉就是安装简单方便,同时功能强大。而且官网也给出了几个相当实用的例子,不管关于消息队列的持久化却并没有提及,关于持久化的问题我会在后面的文章中再详细说明。不过在天朝想要直接访问RabbitMQ官网有些困难,所以建议还是安装一下fangqiang工具goagent 在Windows和Linux下都可以使用,具体信息还是自己看官网  https://code.google.com/p/goagent/


RabbitMQ的主要特点

    支持一对多方式

        多个Queue绑定到一个Exchange后,通过向Exchange发送消息,就可将信息转发到多个绑定到Exchange的Queue中,

    消息持久化

        如果对消息进行了持久话处理,那么消息队列都将保存到服务器中,即使RabbitMQ服务器停止,下一次启动消息依然存在

    消息序列一致性

        每个消费者对自己的Queue操作,由于Queue是消息是队列形式保存,所以可以保证绑定到同一Exchange的消息队列的信息序列是一致的

    状态一致性

        对于消费者可以通过设置信息分发方式,让消费者每次只从队列种取出一条信息,操作完成并确认后才发送下一条信息,当操作出现异常如宕机,未完成的消息依然保存在服务器,可以保证在下一次消费者程序启动后可以从上一次操作未完成的位置继续执行。

 

 

Linux下安装RabbitMQ



   rabbitMQ是一个消息中间件,负责消息的接受和传递。openstack中貌似也是使用rabbitMQ作为消息中间件,这也是我们选择rabbitMQ的主要原因。保持一致性嘛。
   关于rabbitMQ的安装这里使用APT的安装方式,只要网络比较好,安装起来还是很快的
   首先添加一下内容到 /etc/apt/sources.list中

1 deb http://www.rabbitmq.com/debian/ testing main

为了避免安装时出现错误我们需要将rabbitMQ的公钥添加到我们的信任列表中
1 wget http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
2 sudo   apt-key add rabbitmq-signing-key-public.asc

直接运行 apt-get update
最后直接安装即可
1 sudo   apt-get  install   rabbitmq-server

通过这种方式安装可以避免繁琐的配置方式,对于需要快速了解rabbitMQ的人,可以避免安装过程产生的各种麻烦事情。
安装完成后Rabbit服务器将自动启动,RabbitMQ提供了一些简单实用的命令用于管理服务器运行状态,以及查询服务器状态信息
下面说几个比较常用的,熟悉这些命令对之后分析服务器状态,以及持久化内容都有一定帮组。RabbitMQ的命令都需要在管理员权限下执行
查看服务器运行状态: rabbitmq-server status
启动服务器:rabbitmq-server start
停止服务器:rabbitmq-server stop

查看服务器中所有的消息队列信息 :rabbitmqctl list_queues
查看服务器种所有的路由信息: rabbitmqctl list_exchanges
查看服务器种所有的路由与消息队列绑定信息 :rabbitmq list_bindings



RabbitMQ 之Hello World

     
    rabbitMQ作为一个消息服务中间件负责接受消息和转发,下面给出一个简单的例子用来说明这种工作方式。
一个简单的实现主要包含3种角色生产者(Producing),消息队列(queue),消费者(consuming)。当然rabbitMQ的核心并不在于此,rabbitMQ核心主要包含Exchange和queue,消息持久化操作也是通过这两个核心来完成的。后面文章会有详细说明,现在先看一下简单的消息队列实现的例子。
 

    生产者 Send.java
01 import   com.rabbitmq.client.Channel;
02 import   com.rabbitmq.client.Connection;
03 import   com.rabbitmq.client.ConnectionFactory;
04  
05  
06 public   class   Send {
07  
08      //消息队列名称
09      private   final   static   String QUEUE_NAME= "hello" ;
10       
11      public   static   void   main(String[] args)   throws   java.io.IOException{
12           
13          //创建链接工程
14          ConnectionFactory factory =  new   ConnectionFactory();
15          factory.setHost( "localhost" );
16          //创建链接
17          Connection connection = factory.newConnection();
18           
19          //创建消息通道
20          Channel channel = connection.createChannel();
21           
22          //生命一个消息队列
23          channel.queueDeclare(QUEUE_NAME,  false false false null );
24           
25          String message =  "Hello World" ;
26           
27          //发布消息,第一个参数表示路由(Exchange名称),未""则表示使用默认消息路由
28          channel.basicPublish( "" , QUEUE_NAME,  null , message.getBytes());
29           
30          System.out.println( " [x] Sent '" +message+ "'" );
31           
32         //关闭消息通道和链接
33          channel.close();
34          connection.close();
35           
36      }
37       
38 }

    消费者Recv.java
01 import   com.rabbitmq.client.Channel;
02 import   com.rabbitmq.client.Connection;
03 import   com.rabbitmq.client.ConnectionFactory;
04 import   com.rabbitmq.client.QueueingConsumer;
05  
06 public   class   Recv {
07  
08      //消息队列名称
09      private   final   static   String QUEUE_NAME= "hello" ;
10       
11      public   static   void   main(String[] args)   throws java.io.IOException,java.lang.InterruptedException{
12           
13          //创建链接工厂
14          ConnectionFactory factory =  new   ConnectionFactory();
15          factory.setHost( "localhost" );
16          //创建链接
17          Connection connection = factory.newConnection();
18           
19          //创建消息信道
20          Channel channel = connection.createChannel();
21           
22          //生命消息队列
23          channel.queueDeclare(QUEUE_NAME, false , false , false , null );
24          System.out.println( "[*] Waiting for message. To exist press CTRL+C" );
25           
26          //消费者用于获取消息信道绑定的消息队列中的信息
27          QueueingConsumer consumer =  new   QueueingConsumer(channel);
28           
29          channel.basicConsume(QUEUE_NAME,  true ,consumer);
30           
31          while ( true ){
32               
33              //循环获取消息队列中的信息
34              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
35              String message =  new   String(delivery.getBody());
36              System.out.println( "[x] Received '" +message+ "'" );
37               
38          }
39           
40      }
41       
42 }
    这里并没有做消息的持久化操作,由于某些原因在程序运行过程中可能出现服务器宕机,以及其他外在因素使程序运行出现出错,消息的持久化就是使服务器即使停止后下一次启动依然能保持上一次操作消息队列的状态。可以保证程序依然能从异常开始的地方重新执行。也就是为什么要对消息进行持久化的主要原因。

信息发布与订阅


        Rabbit 的核心组件包含 Queue( 消息队列 ) Exchanges 两部分, Exchange 的主要部分就是对信息进行路由,通过将消息队列绑定到 Exchange 上,则可以实现订阅形式的消息发布及 Publish/Subscribe 在这种模式下消息发布者只需要将信息发布到相应的 Exchange 中,而 Exchange 则自动将信息分发到不同的 Queue 当中。

    这种模式下 Exchange 充当的角色

    在命令行中可以使用

    sudo rabbitmqctl list_exchanges

    sudo rabbitmqctl list_bindings

    分别查看当前系统种存在的 Exchange Exchange 上绑定的 Queue 信息。

    消息发布者 EmitLog.java

01 import   com.rabbitmq.client.Channel;
02 import   com.rabbitmq.client.Connection;
03 import   com.rabbitmq.client.ConnectionFactory;
04  
05 public   class   EmitLog {
06  
07      private   static   final   String  EXCHANGE_NAME= "logs" ;
08       
09      public   static   void   main(String[] args)  throws   java.io.IOException{
10           
11          //创建链接工厂
12          ConnectionFactory factory =  new   ConnectionFactory();
13          factory.setHost( "localhost" );
14          //创建链接
15          Connection connection = factory.newConnection();
16           
17          //创建信息管道
18          Channel channel = connection.createChannel();
19           
20          //生命Exchange 非持久化
21          channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
22           
23          String message =  "Message " +Math.random();
24           
25          //第一个参数是对应的Exchange名称,如果为空则使用默认Exchange
26          channel.basicPublish(EXCHANGE_NAME,  "" null , message.getBytes());
27          System.out.println( "[x] Sent '" +message+ "'" );
28           
29          //关闭链接
30          channel.close();
31          connection.close();
32           
33      }
34       
35 }


    消息消费者 ReceiveLogs.java

01 import   java.io.IOException;
02  
03 import   com.rabbitmq.client.Channel;
04 import   com.rabbitmq.client.Connection;
05 import   com.rabbitmq.client.ConnectionFactory;
06 import   com.rabbitmq.client.ConsumerCancelledException;
07 import   com.rabbitmq.client.QueueingConsumer;
08 import   com.rabbitmq.client.ShutdownSignalException;
09  
10 public   class   ReceiveLogs {
11  
12      private   static   final   String EXCHANGE_NAME =  "logs" ;
13  
14      public   static   void   main(String[] args)  throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
15  
16          //创建链接工厂
17          ConnectionFactory factory =  new   ConnectionFactory();
18          factory.setHost( "localhost" );
19          //创建链接
20          Connection connection = factory.newConnection();
21           
22          //创建消息管道
23          Channel channel = connection.createChannel();
24  
25          //声明Exchange
26          channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" );
27           
28          //利用系统自动声明一个非持久化的消息队列,并返回唯一的队列名称
29          String queueName = channel.queueDeclare().getQueue();
30  
31          //将消息队列绑定到Exchange
32          channel.queueBind(queueName, EXCHANGE_NAME,  "" );
33  
34          System.out.println( " [*] Waiting for messages. To exit press CTRL+C" );
35  
36          //声明一个消费者
37          QueueingConsumer consumer =  new   QueueingConsumer(channel);
38          channel.basicConsume(queueName,  true , consumer);
39  
40          while   ( true ) {
41               
42              //循环获取信息
43              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
44              String message =  new   String(delivery.getBody());
45              System.out.println( " [x] Received '"   + message +  "'" );
46               
47          }
48  
49      }
50  
51 }


    运行时启动一个 EmitLog.java 多个 ReceiveLogs.java 则可以看到发布者每次发布信息,只要绑定到了相应 Exchange 的消费者都可以获取到信息。

RabbitMQ 信息持久化技术

    上面的例子中我们实现了 Publisher/Subscribe 的消息分发方式,但是其中存在一些问题。比如当我们运行一个 ReceiveLog 都对应了一个特定的消息队列,可以利用 list_queues 进行查看,同时这些消息队列是帮到到名为 logs Exchange 中,这是发布消息每个消费者都可以接收到,可以当关闭 ReceiveLog 程序后这些消息队列就都会自动销毁,因为他们是非持久化的。同样对于 EmitLog 程序也一样,每次关闭后之前生命的 Exchange 也将自动销毁。

    这就产生了一些问题。如果当 ReceiveLog 为运行时,此时就并没有一个消息队列是绑定到 Exchange 上的,在发布消息后再启动 ReceiveLog 程序是无法接受到之前发布的信息。这就是为什么要进行消息的持久化。

    通过持久化技术,我们可以生命一个持久化的 Exchange ,以及持久化的 Queue 这样,在把 Queue 绑定到 Exchange 后,即使没有消费者程序运行,信息依然能保存在 Queue 当中,当下次启动消费者程序时依然能获取到发布的所有信息。就好比当一个消费者程序在执行消息序列中的任务时,如果突然出现了异常那么重新启动后,依然能从上一次发生错误的位置继续运行,对于某些需要一个有序性和连续性的操作,这点显的尤为重要。

    下面还是给出一个例子,在持久化过程中,可以借助 list_exchanges,list_bindings,list_queues 来查看服务器中相关信息来帮组分析过程。

    Publisher.java

01 import   com.rabbitmq.client.Channel;
02 import   com.rabbitmq.client.Connection;
03 import   com.rabbitmq.client.ConnectionFactory;
04 import   com.rabbitmq.client.MessageProperties;
05  
06 public   class   Publisher {
07       
08      private   static   final   String  EXCHANGE_NAME= "persi" ; //定义Exchange名称
09      private   static   final   boolean   durable =  true ; //消息队列持久化
10       
11      public   static   void   main(String[] args)  throws   java.io.IOException {
12  
13          ConnectionFactory factory =  new   ConnectionFactory(); //创建链接工厂
14          factory.setHost( "localhost" );
15          Connection connection = factory.newConnection(); //创建链接
16          Channel channel = connection.createChannel(); //创建信息通道
17                   
18          channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" , durable); //创建交换机并生命持久化
19  
20          String message =  "Hello Wrold " +Math.random();
21                  //消息的持久化
22          channel.basicPublish(EXCHANGE_NAME,  "" , MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
23           
24          System.out.println( "[x] Sent '"   + message +  "'" );
25  
26          channel.close();
27          connection.close();
28  
29      }
30       
31 }


    Subscriber.java

01 public   class   Subscriber {
02  
03       
04      //private static final String[] QUEUE_NAMES= {"que_001","que_002","que_003","que_004","que_005"};
05      private   static   final String[] QUEUE_NAMES= { "que_006" , "que_007" , "que_008" , "que_009" , "que_0010" };
06       
07      public   static   void   main(String[] args){
08  
09          for ( int   i= 0 ;i<QUEUE_NAMES.length;i++){
10               
11              SubscriberThead sub =  new   SubscriberThead(QUEUE_NAMES[i]);
12              Thread t =  new   Thread(sub);
13              t.start();
14               
15          }
16           
17      }
18 }



    SubscriberThead.java

01 import   com.rabbitmq.client.Channel;
02 import   com.rabbitmq.client.Connection;
03 import   com.rabbitmq.client.ConnectionFactory;
04 import   com.rabbitmq.client.QueueingConsumer;
05 import   com.rabbitmq.client.AMQP.Queue.DeclareOk;
06  
07 public   class   SubscriberThead  implements   Runnable {
08  
09      private   String queue_name =  null ;
10      private   static   final   String EXCHANGE_NAME =  "persi" ; // 定义交换机名称
11      private   static   final   boolean   durable =  true ; //消息队列持久化
12       
13      public   SubscriberThead(String queue_name) {
14           
15          this .queue_name = queue_name;
16       
17      }
18  
19      @Override
20      public   void   run() {
21  
22          try {
23           
24          ConnectionFactory factory =  new   ConnectionFactory();
25          factory.setHost( "localhost" );
26          Connection connection = factory.newConnection();
27          Channel channel = connection.createChannel();
28  
29          channel.exchangeDeclare(EXCHANGE_NAME,  "fanout" , durable);
30  
31          DeclareOk ok = channel.queueDeclare(queue_name, durable,  false ,
32                  false null );
33          String queueName = ok.getQueue();
34           
35  
36          channel.queueBind(queueName, EXCHANGE_NAME,  "" );
37  
38          System.out.println( " [" +queue_name+ "] Waiting for messages. To exit press CTRL+C" );
39  
40          channel.basicQos( 1 ); //消息分发处理
41          QueueingConsumer consumer =  new   QueueingConsumer(channel);
42          channel.basicConsume(queueName,  false , consumer);
43  
44          while   ( true ) {
45  
46              Thread.sleep( 2000 );
47              QueueingConsumer.Delivery delivery = consumer.nextDelivery();
48              String message =  new   String(delivery.getBody());
49              System.out.println( " [" +queue_name+ "] Received '"   + message +  "'" );
50              channel.basicAck(delivery.getEnvelope().getDeliveryTag(),  false );
51  
52          }
53          } catch (Exception e){
54               
55              e.printStackTrace();
56          }
57           
58  
59      }
60  
61 }


    通过持久化处理后rabbitMQ将保存Exchange信息以及Queue信息,甚至在rabbitMQ服务器关闭后信息依然能保存,这样就提供了消息传递的可靠性

在消息队列RabbitMQ入门介绍里,描述了RabbitMQ的持久性设置。在设置持久化后,消息保存在磁盘上,即使RabbitMQ重启或服务器重启,消息都不会丢失。

RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
(1)exchange持久化,在声明时指定durable => 1
(2)queue持久化,在声明时指定durable => 1
(3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

但是,即使设置了持久化,也不能百分百保证消息不会丢失。有很小的概率在RabbitMQ接受到消息后,还没来得及写到磁盘,就发生重启了。另外,RabbitMQ也不会对每一个消息执行fsync(2),消息可能仅仅写入到缓存,还没来得及flush到硬件存储。因此RabbitMQ的持久性设置并非足够安全,对于普通的工作队列也许够用了。如果需要加强的安全保证,可以把发布消息的代码封装在事务里。

-------------------------------------------------

1      什么是RabbitMQ?

RabbitMQ 是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:


 

单向解耦


 

双向解耦(如:RPC

     例如一个日志系统,很容易使用RabbitMQ简化工作量,一个Consumer可以进行消息的正常处理,另一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由RabbitMQ完成。
 

 

使用RabbitMQ server 需要:

1. ErLang 语言包;

2. RabbitMQ 安装包;

RabbitMQ 同时提供了java的客户端(一个jar包)。

 

2      概念和特性

2.1      交换机(exchange):

1.  接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanout

direct :转发消息到routigKey指定的队列

topic :按规则转发消息(最灵活)

headers :(这个还没有接触到)

fanout :转发消息到所有绑定队列

2.  如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。

3.  一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。

4. topic 类型交换器通过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,但是不匹配stock.nana。

还有一些其他的交换器类型,如header 、failover、system等,现在在当前的RabbitMQ版本中均未实现。

5.  因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。

6.  交换器的属性:

持久性:如果启用,交换器将会在server重启前都有效。

自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。

惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

 

2.2      队列(queue):

1.  队列是RabbitMQ内部对象,存储消息。相同属性的queue可以重复定义。

2.  临时队列。channel.queueDeclare(),有时不需要指定队列的名字,并希望断开连接时删除队列。

3.  队列的属性:

持久性:如果启用,队列将会在server重启前都有效。

自动删除:如果启用,那么队列将会在所有的消费者停止使用之后自动删除掉自身。

惰性:如果没有声明队列,那么在执行到使用的时候会导致异常,并不会主动声明。

排他性:如果启用,队列只能被声明它的消费者使用。

这些性质可以用来创建例如排他和自删除的transient 或者私有队列。这种队列将会在所有链接到它的客户端断开连接之后被自动删除掉。它们只是短暂地连接到server,但是可以用于实现例如RPC或者在AMQ上的对等通信。4. RPC的使用是这样的:RPC客户端声明一个回复队列,唯一命名(例如用UUID),并且是自删除和排他的。然后它发送请求给一些交换器,在消息的reply-to字段中包含了之前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to作为routing key(默认绑定器会绑定所有的队列到默认交换器,名称为“amp.交换器类型名”)发送到默认交换器。注意这仅仅是惯例而已,可以根据和RPC服务器的约定,它可以解释消息的任何属性(甚至数据体)来决定回复给谁。

2.3      消息传递:

1.  消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,可以动态的增加消费者以提高消息的处理能力。

2.  为了实现负载均衡,可以在消费者端通知RabbitMQ,一个消息处理完之后才会接受下一个消息。

channel.basic_qos(prefetch_count=1)

注意:要防止如果所有的消费者都在处理中,则队列中的消息会累积的情况。

3.  消息有14个属性,最常用的几种:

deliveryMode :持久化属性

contentType :编码

replyTo :指定一个回调队列

correlationId :消息id

实例代码:

4.  消息生产者可以选择是否在消息被发送到交换器并且还未投递到队列(没有绑定器存在)和/或没有消费者能够立即处理的时候得到通知。通过设置消息的mandatory和/或immediate属性为真,这些投递保障机制的能力得到了强化。

5.  此外,一个生产者可以设置消息的persistent属性为真。这样一来,server将会尝试将这些消息存储在一个稳定的位置,直到server崩溃。当然,这些消息肯定不会被投递到非持久的队列中。

 

2.4      高可用性(HA):

1.  消息ACK,通知RabbitMQ消息已被处理,可以从内存删除。如果消费者因宕机或链接失败等原因没有发送ACK(不同于ActiveMQ,在RabbitMQ里,消息没有过期的概念),则RabbitMQ会将消息重新发送给其他监听在队列的下一个消费者。

channel.basicConsume(queuename, noAck=false, consumer);

2.  消息和队列的持久化。定义队列时可以指定队列的持久化属性(问:持久化队列如何删除?)

channel.queueDeclare(queuename, durable=true, false, false, null);

发送消息时可以指定消息持久化属性:

channel.basicPublish(exchangeName, routingKey,

            MessageProperties.PERSISTENT_TEXT_PLAIN,

            message.getBytes());

这样,即使RabbitMQ 服务器重启,也不会丢失队列和消息。

3. publisher confirms

4. master/slave 机制,配合Mirrored Queue,这种情况下,publisher会正常发送消息和接收消息的confirm,但对于subscriber来说,需要接收Consumer Cancellation Notifications来得到主节点失败的通知,然后re-consume from the queue,此时要求client有处理重复消息的能力。注意:如果queue在一个新加入的节点上增加了一个slave,此时slave上没有此前queue的信息(目前还没有同步机制)。

(通过命令行或管理插件可以查看哪个slave 是同步的:

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

     当一个slave重新加入mirrored-queue时,如果queue是durable的,则会被清空。

 

2.5      集群(cluster):

1.  不支持跨网段(如需支持,需要shovel或federation插件)

2.  可以随意的动态增加或减少、启动或停止节点,允许节点故障

3.  集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。

4.  集群的配置可以通过命令行,也可以通过配置文件,命令行优先。

 

3      使用

3.1      简易使用流程
 

3.2      RabbitMQ在OpenStack中的使用

 

 

     在Openstack中,组件之间对RabbitMQ使用基本都是“Remote Procedure Calls”的方式。每一个Nova服务(比如计算服务、存储服务等)初始化时会创建两个队列,一个名为“NODE-TYPE.NODE-ID”,另一个名为“NODE-TYPE”,NODE-TYPE是指服务的类型,NODE-ID指节点名称。

     从抽象层面上讲,RabbitMQ的组件的使用类似于下图所示:

每个服务会绑定两个队列到同一个topic 类型的exchange,从不同的队列中接收不同类型的消息。消息的发送者如果关心消息的返回值,则会监听另一个队列,该队列绑定在一个direct类型的exchange。接受者收到消息并处理后,会将消息的返回发送到此exchange。

在Openstack 中,如果不关心消息返回,消息的流程图如下:

 

     如果关心消息返回值,流程图如下:



 

 

3.3      为什么要使用RabbitMQ?

曾经有过一个人做过一个测试( http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html ),发送1 百万个并发消息,对性能有很高的需求,于是作者对比了RabbitMQ、MSMQ、ActiveMQ、ZeroMQueue,整个过程共产生1百万条1K的消息。测试的执行是在一个Windows Vista上进行的,测试结果如下:



     虽然ZeroMQ性能较高,但这个产品不提供消息持久化,需要自己实现审计和数据恢复,因此在易用性和HA上不是令人满意,通过测试结果可以看到,RabbitMQ的性能确实不错。

     我在本机也做了一些测试,但我的测试是基于组件的原生配置,没有做任何的配置优化,因此总觉的不靠谱。我只测试了RabbitMQ和ActiveMQ两款产品,虽然网上都说ActiveMQ性能不如前者,但平心而论,ActiveMQ提供了很多配置,存在很大的调优空间,也许修改一个配置参数就会使组件的性能有一个质的飞跃。








我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章