ActiveMQ整合Spring JMS

JMS即Java消息服务(Java Message Service),是Java平台上的一套关于消息中间件的规范,或者说是一套统一的API。支持JMS的消息中间件有很多,ActiveMQ算是其中最常用的一个。

JMS两种模型

JMS支持以下两种模型,本文将会对这两种模型分别介绍如何整合Spring:

  • 点对点(Point-to-Point),对应的destination是Queue
  • 发布订阅(Publish/Subscribe),对应的destination是Topic

建立连接

无论是点对点还是发布订阅,生产者或者消费者,第一步是要获取连接。ActiveMQ提供了 org.apache.activemq.pool.PooledConnectionFactory 连接池,类似于常用的数据库连接池,用于管理和复用连接。

<!-- ActiveMQ连接池 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616" />
        </bean>
    </property>
</bean>

生产者

Spring提供了JmsTemplate可以很方便的发送消息。

点对点模型

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination">
        <bean class="org.apache.activemq.command.ActiveMQQueue"> <!-- 指定Destination为Queue(点对点模型) -->
            <constructor-arg value="testqueue" /> <!-- Queue name -->
        </bean>
    </property>
</bean>

发布订阅模型

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination">
        <bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination为Topic(发布订阅模型) -->
            <constructor-arg value="testtopic" /> <!-- Topic name -->
        </bean>
    </property>
</bean>

发送消息代码

使用上面定义的JmsTemplate,通过JmsTemplate.send方法可以发送一条文本类型消息。

@Autowired
private JmsTemplate jmsTemplate;

public void send(String mesasage) {
    jmsTemplate.send(new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(mesasage);
        }
    });
}

如果你正在使用Java 8,使用Lambda表达式发送消息会更加方便:

@Autowired
private JmsTemplate jmsTemplate;

public void send(String mesasage) {
    jmsTemplate.send(session -> session.createTextMessage(mesasage));
}

消费者

JmsTemplate也可以作为消费者使用,但是它是同步的。下面介绍Spring JMS提供的异步的消费者方案。

点对点模型

<!-- 消息监听器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination">
        <bean class="org.apache.activemq.command.ActiveMQQueue"> <!-- 指定Destination为Queue(点对点模型) -->
            <constructor-arg value="testqueue" /> <!-- Queue name -->
        </bean>
    </property>
    <property name="messageListener">
        <bean class="com.xxg.jms.listener.ConsumerMessageListener" />
    </property>
</bean>

发布订阅模型

<!-- 消息监听器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination">
        <bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination为Topic(发布订阅模型) -->
            <constructor-arg value="testtopic" /> <!-- Topic name -->
        </bean>
    </property>
    <property name="messageListener">
        <bean class="com.xxg.jms.listener.ConsumerMessageListener" />
    </property>
</bean>

发布订阅模型消息者持久订阅(Durable Subscriber)

上面给出的是非持久订阅的发布订阅模型消费者,这里来单独说一下持久订阅。需要注意的是,持久订阅(Durable Subscriber)并非消息持久化(DeliveryMode.PERSISTENT),这是两个不同的概念。

非持久订阅的消费者,如果消费者程序挂了,那么挂了的这段时间的消息是收不到的,即使再重启起来也收不到。持久订阅消费者可以让消费者在重启后任然能收到停止的这段时间的消息,避免遗漏。

要想使用持久订阅,需要给消费者设置一个唯一的client ID和Subscriber Name,这样可以让ActiveMQ记住这个消费者,当消费者断开连接程序停止,ActiveMQ也会给这个消费者保留这段时间内的消息,下次同一个消费者(client ID和Subscriber Name相同)重新连接上还能收到消息。

<!-- ActiveMQ连接池 -->
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
    <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://localhost:61616" />
            <property name="clientID" value="clientID_123456" />  <!-- 指定一个唯一的clientID -->
        </bean>
    </property>
</bean>

<!-- 消息监听器 -->
<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination">
        <bean class="org.apache.activemq.command.ActiveMQTopic"> <!-- 指定Destination为Topic(发布订阅模型) -->
            <constructor-arg value="testtopic" /> <!-- Topic name -->
        </bean>
    </property>
    <property name="messageListener">
        <bean class="com.xxg.jms.listener.ConsumerMessageListener" />
    </property>
    <property name="durableSubscriptionName" value="durableSubscriptionName_123456" /> <!-- 指定一个唯一的Name -->
</bean>

接收消息代码

public class ConsumerMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

参考资料

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章