MENU

Spring整合JMS——基于ActiveMQ实现

JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

1. Spring整合JMS

对JMS做了一个简要介绍之后,接下来就讲一下Spring整合JMS的具体过程。JMS只是一个标准,真正在使用它的时候我们需要有它的具体实现,这里我们就使用Apache的ActiveMQ来作为它的实现。所使用的依赖利用Maven来进行管理,具体依赖如下:

<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.10</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring-version}</version>
    </dependency>
    <dependency>
        <groupId>javax.annotation</groupId>
        <artifactId>jsr250-api</artifactId>
        <version>1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
    </dependency>
</dependencies>

1.1 ActiveMQ准备

既然是使用的Apache的ActiveMQ作为JMS的实现,那么首先我们应该到Apache官网上下载activeMQ(http://activemq.apache.org/download.html),进行解压后运行其bin目录下面的activemq.bat文件启动ActiveMQ。

1.2 配置ConnectionFactory

ConnectionFactory是用于产生到JMS服务器的链接的,Spring为我们提供了多个ConnectionFactory,有SingleConnectionFactoryCachingConnectionFactorySingleConnectionFactory对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connectionclose方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存SessionMessageProducerMessageConsumer。这里我们使用SingleConnectionFactory来作为示例。

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"/>

这样就定义好产生JMS服务器链接的ConnectionFactory了吗?答案是非也。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory的,真正产生到JMS服务器链接的ConnectionFactory还得是由JMS服务厂商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我们这里使用的是ActiveMQ实现的JMS,所以在我们这里真正的可以产生Connection的就应该是由ActiveMQ提供的ConnectionFactory。所以定义一个ConnectionFactory的完整代码应该如下所示:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
    
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>

ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory可以用来将ConnectionSessionMessageProducer池化,这样可以大大的减少我们的资源消耗。当使用PooledConnectionFactory时,我们在定义一个ConnectionFactory时应该是如下定义:

<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
    
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
    <property name="connectionFactory" ref="targetConnectionFactory"/>
    <property name="maxConnections" value="10"/>
</bean>
    
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
</bean>

1.3 配置生产者

配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的,所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发,为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象。

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

在真正利用JmsTemplate进行消息发送的时候,我们需要知道消息发送的目的地,即destination。在Jms中有一个用来表示目的地的Destination接口,它里面没有任何方法定义,只是用来做一个标识而已。当我们在使用JmsTemplate进行消息发送时没有指定destination的时候将使用默认的Destination。默认Destination可以通过在定义jmsTemplate bean对象时通过属性defaultDestinationdefaultDestinationName来进行注入,defaultDestinationName对应的就是一个普通字符串。在ActiveMQ中实现了两种类型的Destination,一个是点对点的ActiveMQQueue,另一个就是支持订阅/发布模式的ActiveMQTopic。在定义这两种类型的Destination时我们都可以通过一个name属性来进行构造,如:

<!--这个是队列目的地,点对点的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>queue</value>
    </constructor-arg>
</bean>
<!--这个是主题目的地,一对多的-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="topic"/>
</bean>

假设我们定义了一个ProducerService,里面有一个向Destination发送纯文本消息的方法sendMessage,那么我们的代码就大概是这个样子:

package com.tiantian.springintejms.service.impl;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.tiantian.springintejms.service.ProducerService;

@Component
public class ProducerServiceImpl implements ProducerService {

    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination, final String message) {
        System.out.println("---------------生产者发送消息-----------------");
        System.out.println("---------------生产者发了一个消息:" + message);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    } 
    
    public JmsTemplate getJmsTemplate() {
        returnjmsTemplate;
    } 

    @Resource
    public void setJmsTemplate(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
 
}

我们可以看到在sendMessage方法体里面我们是通过jmsTemplate来发送消息到对应的Destination的。到此,我们生成一个简单的文本消息并把它发送到指定目的地Destination的生产者就配置好了。

1.4 配置消费者

生产者往指定目的地Destination发送消息后,接下来就是消费者对指定目的地的消息进行消费了。那么消费者是如何知道有生产者发送消息到指定目的地Destination了呢?这是通过Spring为我们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。

每个消费者对应每个目的地都需要有对应的MessageListenerContainer。对于消息监听容器而言,除了要知道监听哪个目的地之外,还需要知道到哪里去监听,也就是说它还需要知道去监听哪个JMS服务器,这是通过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。

所以我们在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener

Spring一共为我们提供了两种类型的MessageListenerContainer

  • SimpleMessageListenerContainer
  • DefaultMessageListenerContainer。

SimpleMessageListenerContainer会在一开始的时候就创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。

大多数情况下我们还是使用的DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。

定义处理消息的 MessageListener

要定义处理消息的MessageListener我们只需要实现JMS规范中的MessageListener接口就可以了。MessageListener接口中只有一个方法onMessage方法,当接收到消息的时候会自动调用该方法。

package com.tiantian.springintejms.listener;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一个纯文本消息。");
        try {
            System.out.println("消息内容是:" + textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

有了MessageListener之后我们就可以在Spring的配置文件中配置一个消息监听容器了。

<!--这个是队列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>queue</value>
    </constructor-arg>
</bean>
<!-- 消息监听器 -->
<bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>   

<!-- 消息监听容器 -->
<bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
</bean>

我们可以看到我们定义了一个名叫queueActiveMQQueue目的地,我们的监听器就是监听了发送到这个目的地的消息。

至此我们的生成者和消费者都配置完成了,这也就意味着我们的整合已经完成了。这个时候完整的Spring的配置文件应该是这样的:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
 
    <context:component-scan base-package="com.tiantian" />
 
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!--这个是队列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!-- 消息监听器 -->
    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
    <!-- 消息监听容器 -->
    <bean id="jmsContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>
</beans> 

接着我们来测试一下,看看我们的整合是否真的成功了,测试代码如下:

package com.tiantian.springintejms.test;
 
import javax.jms.Destination;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.tiantian.springintejms.service.ProducerService;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;
    
    @Test
    public void testSend() {
        for (int i=0; i<2; i++) {
            producerService.sendMessage(destination, "你好,生产者!这是消息:" + (i+1));
        }
    }
    
}

在上面的测试代码中我们利用生产者发送了两个消息,正常来说,消费者应该可以接收到这两个消息。运行测试代码后控制台输出如下:

img

看,控制台已经进行了正确的输出,这说明我们的整合确实是已经成功了。

2. 消息监听器MessageListener

在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListenerSessionAwareMessageListenerMessageListenerAdapter。下面就分别来介绍一下这几种类型的区别。

2.1 MessageListener

MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消费者的时候用的消息监听器就是MessageListener,代码如下:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一个纯文本消息。");
        try {
            System.out.println("消息内容是:" + textMsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

2.2  SessionAwareMessageListener

SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个ConnectionSessionSessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。先来看一段代码:

package com.tiantian.springintejms.listener;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.jms.listener.SessionAwareMessageListener;
 
public class ConsumerSessionAwareMessageListener implements
        SessionAwareMessageListener<TextMessage> {
 
    private Destination destination;
    
    public void onMessage(TextMessage message, Session session) throws JMSException {
        System.out.println("收到一条消息");
        System.out.println("消息内容是:" + message.getText());
        MessageProducer producer = session.createProducer(destination);
        Message textMessage = session.createTextMessage("ConsumerSessionAwareMessageListener。。。");
        producer.send(textMessage);
    }
 
    public Destination getDestination() {
        returndestination;
    }
 
    public void setDestination(Destination destination) {
        this.destination = destination;
    }
 
}

在上面代码中我们定义了一个SessionAwareMessageListener,在这个Listener中我们在接收到了一个消息之后,利用对应的Session创建了一个到destination的生产者和对应的消息,然后利用创建好的生产者发送对应的消息。

接着我们在Spring的配置文件中配置该消息监听器将处理来自一个叫sessionAwareQueue的目的地的消息,并且往该MessageListener中通过set方法注入其属性destination的值为queueDestination。这样当我们的SessionAwareMessageListener接收到消息之后就会往queueDestination发送一个消息。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
 
    <context:component-scan base-package="com.tiantian" /> 
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>
    
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
    </bean>
    
    <!--这个是队列目的地-->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>queue</value>
        </constructor-arg>
    </bean>
    <!--这个是sessionAwareQueue目的地-->
    <bean id="sessionAwareQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>sessionAwareQueue</value>
        </constructor-arg>
    </bean>
    <!-- 消息监听器 -->
    <bean id="consumerMessageListener" class="com.tiantian.springintejms.listener.ConsumerMessageListener"/>
    <!-- 可以获取session的MessageListener -->
    <bean id="consumerSessionAwareMessageListener" class="com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener">
        <property name="destination" ref="queueDestination"/>
    </bean>
    <!-- 消息监听容器 -->
    <bean id="jmsContainer"        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="consumerMessageListener" />
    </bean>
    
    <bean id="sessionAwareListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="sessionAwareQueue" />
        <property name="messageListener" ref="consumerSessionAwareMessageListener" />
    </bean>
</beans>

接着我们来做一个测试,测试代码如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;
    @Autowired
    @Qualifier("sessionAwareQueue")
    private Destination sessionAwareQueue;
    
    @Test
    public void testSessionAwareMessageListener() {
        producerService.sendMessage(sessionAwareQueue, "测试SessionAwareMessageListener");
    }
    
}

在上述测试代码中,我们通过前面定义好的生产者往我们定义好的SessionAwareMessageListener监听的sessionAwareQueue发送了一个消息。程序运行之后控制台输出如下:

img

这说明我们已经成功的往sessionAwareQueue发送了一条纯文本消息,消息会被ConsumerSessionAwareMessageListeneronMessage方法进行处理,在onMessage方法中ConsumerSessionAwareMessageListener就是简单的把接收到的纯文本信息的内容打印出来了,之后再往queueDestination发送了一个纯文本消息,消息内容是ConsumerSessionAwareMessageListener…,该消息随后就被ConsumerMessageListener处理了,根据我们的定义,在ConsumerMessageListener中也只是简单的打印了一下接收到的消息内容。

2.3  MessageListenerAdapter

MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。

MessageListenerAdapter会把接收到的消息做如下转换:

  • TextMessage转换为String对象;
  • BytesMessage转换为byte数组;
  • MapMessage转换为Map对象;
  • ObjectMessage转换为对应的Serializable对象。

既然前面说了MessageListenerAdapter会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器——一个普通的Java类进行处理(如果真正的目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为参数调用它们的onMessage方法,而不会再利用反射去进行调用),那么我们在定义一个MessageListenerAdapter的时候就需要为它指定这样一个目标类。这个目标类我们可以通过MessageListenerAdapter的构造方法参数指定,如:

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
<bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </constructor-arg>
</bean>

也可以通过它的delegate属性来指定,如:

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate">
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </property>
    <property name="defaultListenerMethod" value="receiveMessage"/>
</bean>

前面说了如果我们指定的这个目标处理器是一个MessageListener或者是一个SessionAwareMessageListener的时候Spring将直接利用接收到的Message对象作为方法参数调用它们的onMessage方法。但是如果指定的目标处理器是一个普通的Java类时Spring将利用Message进行了类型转换之后的对象作为参数通过反射去调用真正的目标处理器的处理方法,那么Spring是如何知道该调用哪个方法呢?这是通过MessageListenerAdapterdefaultListenerMethod属性来决定的,当我们没有指定该属性时,Spring会默认调用目标处理器的handleMessage方法。

接下来我们来看一个示例,假设我们有一个普通的Java类ConsumerListener,其对应有两个方法,handleMessagereceiveMessage,其代码如下:

package com.tiantian.springintejms.listener;
 
public class ConsumerListener {
 
    public void handleMessage(String message) {
        System.out.println("ConsumerListener通过handleMessage接收到一个纯文本消息,消息内容是:" + message);
    }
    
    public void receiveMessage(String message) {
        System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);
    }
    
}

假设我们要把它作为一个消息监听器来监听发送到adapterQueue的消息,这个时候我们就可以定义一个对应的MessageListenerAdapter来把它当做一个MessageListener使用。

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate">
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </property>
    <property name="defaultListenerMethod" value="receiveMessage"/>
</bean>

当然,有了MessageListener之后我们还需要配置其对应的MessageListenerContainer,这里配置如下:

<!-- 消息监听适配器对应的监听容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adapterQueue"/>
    <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->
</bean>

<!-- 用于测试消息监听适配器的队列目的地 -->
<bean id="adapterQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>adapterQueue</value>
    </constructor-arg>
</bean>

在上面的MessageListenerAdapter中我们指定了其defaultListenerMethod属性的值为receiveMessage,所以当MessageListenerAdapter接收到消息之后会自动的调用我们指定的ConsumerListenerreceiveMessage方法。

针对于上述代码我们定义测试代码如下:

package com.tiantian.springintejms.test;
 
import javax.jms.Destination;
 
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
import com.tiantian.springintejms.service.ProducerService;
 
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {

    @Autowired
    @Qualifier("adapterQueue")
    private Destination adapterQueue;

    @Test
    public void testMessageListenerAdapter() {
        producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");
    }
    
} 

这时候我们会看到控制台输出如下:

img

如果我们不指定MessageListenerAdapterdefaultListenerMethod属性,那么在运行上述代码时控制台会输出如下结果:

img

MessageListenerAdapter除了会自动的把一个普通Java类当做MessageListener来处理接收到的消息之外,其另外一个主要的功能是可以自动的发送返回消息

当我们用于处理接收到的消息的方法的返回值不为空的时候,Spring会自动将它封装为一个JMS Message,然后自动进行回复。那么这个时候这个回复消息将发送到哪里呢?这主要有两种方式可以指定。

第一,可以通过发送的Message的setJMSReplyTo方法指定该消息对应的回复消息的目的地。这里我们把我们的生产者发送消息的代码做一下修改,在发送消息之前先指定该消息对应的回复目的地为一个叫responseQueue的队列目的地,具体代码如下所示:

package com.tiantian.springintejms.service.impl;
 
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
 
import com.tiantian.springintejms.service.ProducerService;
 
@Component
public class ProducerServiceImpl implements ProducerService { 

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    @Qualifier("responseQueue")
    private Destination responseDestination;
    
    public void sendMessage(Destination destination, final String message) {
        System.out.println("---------------生产者发送消息-----------------");
        System.out.println("---------------生产者发了一个消息:" + message);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage(message);
                textMessage.setJMSReplyTo(responseDestination);
                return textMessage;
            }
        });
    }
 
}

接着定义一个叫responseQueue的队列目的地及其对应的消息监听器和监听容器。

<!-- 用于测试消息回复的 -->
<bean id="responseQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>responseQueue</value>
    </constructor-arg>
</bean>

<!-- responseQueue对应的监听器 -->
<bean id="responseQueueListener" class="com.tiantian.springintejms.listener.ResponseQueueListener"/>

<!-- responseQueue对应的监听容器 -->
<bean id="responseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="responseQueue"/>
    <property name="messageListener" ref="responseQueueListener"/>
</bean>

ResponseQueueListener的定义如下所示:

public class ResponseQueueListener implements MessageListener {
 
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收到发送到responseQueue的一个文本消息,内容是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}

接着把我们接收消息的ConsumerListener的receiveMessage方法改为如下:

/**
 * 当返回类型是非null时MessageListenerAdapter会自动把返回值封装成一个Message,然后进行回复
 * @param message
 * @return
 */
public String receiveMessage(String message) {
    System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);
    return "这是ConsumerListener对象的receiveMessage方法的返回值。";
}

我们可以看到在上述负责接收消息的receiveMessage方法有一个非空的返回值。

接着我们运行我们的测试代码,利用生产者往我们定义好的MessageListenerAdapter负责处理的adapterQueue目的地发送一个消息。测试代码如下所示:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext.xml")
public class ProducerConsumerTest {
 
    @Autowired
    private ProducerService producerService;

    @Qualifier("adapterQueue")
    @Autowired
    private Destination adapterQueue;   

    @Test
    public void testMessageListenerAdapter() {
        producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");
    }
    
}

运行上述测试代码之后,控制台输出如下:

img

这说明我们的生产者发送消息被MessageListenerAdapter处理之后,MessageListenerAdapter确实把监听器的返回内容封装成一个Message往原Message通过setJMSReplyTo方法指定的回复目的地发送了一个消息。对于MessageListenerAdapter对应的监听器处理方法返回的是一个null值或者返回类型是void的情况,MessageListenerAdapter是不会自动进行消息的回复的,有兴趣的网友可以自己测试一下。

第二,通过MessageListenerAdapter的defaultResponseDestination属性来指定。这里我们也来做一个测试,首先维持生产者发送消息的代码不变,即发送消息前不通过Message的setJMSReplyTo方法指定消息的回复目的地;接着我们在定义MessageListenerAdapter的时候通过其defaultResponseDestination属性指定其默认的回复目的地是defaultResponseQueue,并定义defaultResponseQueue对应的消息监听器和消息监听容器。

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <!-- <constructor-arg>
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </constructor-arg> -->
    <property name="delegate">
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </property>
    <property name="defaultListenerMethod" value="receiveMessage"/>
    <property name="defaultResponseDestination" ref="defaultResponseQueue"/>
</bean>

<!-- 消息监听适配器对应的监听容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adapterQueue"/>
    <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->
</bean>

<!-- 默认的消息回复队列 -->
<bean id="defaultResponseQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
        <value>defaultResponseQueue</value>
    </constructor-arg>
</bean>

<!-- defaultResponseQueue对应的监听器 -->
<bean id="defaultResponseQueueListener" class="com.tiantian.springintejms.listener.DefaultResponseQueueListener"/>

<!-- defaultResponseQueue对应的监听容器 -->
<bean id="defaultResponseQueueMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="defaultResponseQueue"/>
    <property name="messageListener" ref="defaultResponseQueueListener"/>
</bean>

DefaultResponseQueueListener的代码如下所示:

package com.tiantian.springintejms.listener;
 
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
 
public class DefaultResponseQueueListener implements MessageListener {
 
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("DefaultResponseQueueListener接收到发送到defaultResponseQueue的一个文本消息,内容是:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}

这时候运行如下测试代码:

@Test
public void testMessageListenerAdapter() {
    producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter");
}

控制台将输出如下内容:

img

这说明MessageListenerAdapter会自动把真正的消息处理器返回的非空内容封装成一个Message发送回复消息到通过defaultResponseDestination属性指定的默认消息回复目的地。

既然我们可以通过两种方式来指定MessageListenerAdapter自动发送回复消息的目的地,那么当我们两种方式都指定了而且它们的目的地还不一样的时候会怎么发送呢?是两个都发还是只发其中的一个呢?关于这部分的测试我这里就不赘述了,有兴趣的网友可以自己进行。这里我可以直接的告诉大家,当两种方式都指定了消息的回复目的地的时候使用发送消息的setJMSReplyTo方法指定的目的地将具有较高的优先级,MessageListenerAdapter将只往该方法指定的消息回复目的地发送回复消息。

3. 消息转换器MessageConverter

MessageConverter的作用主要有两方面,一方面它可以把我们的非标准化Message对象转换成我们的目标Message对象,这主要是用在发送消息的时候;另一方面它又可以把我们的Message对象转换成对应的目标对象,这主要是用在接收消息的时候。

下面我们就拿发送一个对象消息来举例,假设我们有这样一个需求:我们平台有一个发送邮件的功能,进行发送的时候我们只是把我们的相关信息封装成一个JMS消息,然后利用JMS进行发送,在对应的消息监听器进行接收到的消息处理时才真正的进行消息发送。

假设我们有这么一个Email对象:

public class Email implements Serializable {
 
    private static final long serialVersionUID = -658250125732806493L;
 
    private String receiver;
    private String title;
    private String content;
 
    public Email(String receiver, String title, String content) {
        this.receiver = receiver;
        this.title = title;
        this.content = content;
    }
 
    public String getReceiver() {
        return receiver;
    }
 
    public void setReceiver(String receiver) {
        this.receiver = receiver;
    }
 
    public String getTitle() {
        return title;
    }
 
    public void setTitle(String title) {
        this.title = title;
    }
 
    public String getContent() {
        return content;
    }
 
    public void setContent(String content) {
        this.content = content;
    }
 
    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append("Email [receiver=").append(receiver).append(", title=")
                .append(title).append(", content=").append(content).append("]");
        return builder.toString();
    }
    
}

这个Email对象包含了一个简单的接收者email地址、邮件主题和邮件内容。我们在发送的时候就把这个对象封装成一个ObjectMessage进行发送。代码如下所示:

public class ProducerServiceImpl implements ProducerService {
 
    @Autowired
    private JmsTemplate jmsTemplate;    

    public void sendMessage(Destination destination, final Serializable obj) {
        jmsTemplate.send(destination, new MessageCreator() {
 
            public Message createMessage(Session session) throws JMSException {
                ObjectMessage objMessage = session.createObjectMessage(obj);
                return objMessage;
            }
            
        });
    }
 
}

这是对应的在没有使用MessageConverter的时候我们需要new一个MessageCreator接口对象,然后在其抽象方法createMessage内部使用session创建一个对应的消息。在使用了MessageConverter的时候我们在使用JmsTemplate进行消息发送时只需要调用其对应的convertAndSend方法即可。如:

public void sendMessage(Destination destination, final Serializable obj) {
    //未使用MessageConverter的情况
    /*jmsTemplate.send(destination, new MessageCreator() {
 
        public Message createMessage(Session session) throws JMSException {
            ObjectMessage objMessage = session.createObjectMessage(obj);
            return objMessage;
        }

    });*/
  
    //使用MessageConverter的情况
    jmsTemplate.convertAndSend(destination, obj);
}

这样JmsTemplate就会在其内部调用预定的MessageConverter对我们的消息对象进行转换,然后再进行发送。

这个时候我们就需要定义我们的MessageConverter了。要定义自己的MessageConverter很简单,只需要实现Spring为我们提供的MessageConverter接口即可。我们先来看一下MessageConverter接口的定义:

public interface MessageConverter {
 
    Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;
 
    Object fromMessage(Message message) throws JMSException, MessageConversionException;
 
}

我们可以看到其中一共定义了两个方法fromMessagetoMessagefromMessage是用来把一个JMS Message转换成对应的Java对象,而toMessage方法是用来把一个Java对象转换成对应的JMS Message。因为我们已经知道上面要发送的对象就是一个Email对象,所以在这里我们就简单地定义一个EmailMessageConverter用来把Email对象和对应的ObjectMessage进行转换,其代码如下:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
 
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
 
public class EmailMessageConverter implements MessageConverter {
 
    public Message toMessage(Object object, Session session)
            throws JMSException, MessageConversionException {
        return session.createObjectMessage((Serializable) object);
    }
 
    public Object fromMessage(Message message) throws JMSException,
            MessageConversionException {
        ObjectMessage objMessage = (ObjectMessage) message;
        return objMessage.getObject();
    }
 
}

这样当我们利用JmsTemplate的convertAndSend方法发送一个Email对象的时候就会把对应的Email对象当做参数调用我们定义好的EmailMessageConverter的toMessage方法。

定义好我们的EmailMessageConverter之后就需要指定我们用来发送Email对象的JmsTemplate对象的messageConverter为EmailMessageConverter,这里我们在Spring的配置文件中定义JmsTemplate bean的时候就指定:

<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
    <property name="connectionFactory" ref="connectionFactory"/>
    <!-- 消息转换器 -->
    <property name="messageConverter" ref="emailMessageConverter"/>
</bean>
<!-- 类型转换器 -->
<bean id="emailMessageConverter" class="com.tiantian.springintejms.converter.EmailMessageConverter"/>

到此我们的MessageConverter就定义好了,也能够进行使用了,接着我们来进行测试一下,定义测试代码如下所示:

@Test
public void testObjectMessage() {
    Email email = new Email("zhangsan@xxx.com", "主题", "内容");
    producerService.sendMessage(destination, email);
}

上面destination对应的接收处理的MessageListener方法如下所示:

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        
        if (message instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) message;
            try {
                Object obj = objMessage.getObject();
                Email email = (Email) obj;
                System.out.println("接收到一个ObjectMessage,包含Email对象。");
                System.out.println(email);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
 
}

之前说了MessageConverter有两方面的功能,除了把Java对象转换成对应的Jms Message之外还可以把Jms Message转换成对应的Java对象。我们看上面的消息监听器在接收消息的时候接收到的就是一个Jms Message,如果我们要利用MessageConverter来把它转换成对应的Java对象的话,只能是我们往里面注入一个对应的MessageConverter,然后在里面手动的调用,如: 

public class ConsumerMessageListener implements MessageListener {
 
    private MessageConverter messageConverter;
    
    public void onMessage(Message message) {
        
        if (message instanceof ObjectMessage) {
            ObjectMessage objMessage = (ObjectMessage) message;
            try {
                /*Object obj = objMessage.getObject();
                Email email = (Email) obj;*/
                Email email = (Email) messageConverter.fromMessage(objMessage);
                System.out.println("接收到一个ObjectMessage,包含Email对象。");
                System.out.println(email);
            } catch (JMSException e) {
                e.printStackTrace();
            }
            
        }
    }
 
    public MessageConverter getMessageConverter() {
        return messageConverter;
    }
 
    public void setMessageConverter(MessageConverter messageConverter) {
        this.messageConverter = messageConverter;
    }
 
}

当我们使用MessageListenerAdapter来作为消息监听器的时候,我们可以为它指定一个对应的MessageConverter,这样Spring在处理接收到的消息的时候就会自动地利用我们指定的MessageConverter对它进行转换,然后把转换后的Java对象作为参数调用指定的消息处理方法。这里我们再把前面讲解MessageListenerAdapter时定义的MessageListenerAdapter拿来做一个测试,我们指定它的MessageConverter为我们定义好的EmailMessageConverter。

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate">
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </property>
    <property name="defaultListenerMethod" value="receiveMessage"/>
    <property name="messageConverter" ref="emailMessageConverter"/>
</bean>

<!-- 消息监听适配器对应的监听容器 -->
<bean id="messageListenerAdapterContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="adapterQueue"/>
    <property name="messageListener" ref="messageListenerAdapter"/><!-- 使用MessageListenerAdapter来作为消息监听器 -->
</bean>

然后在我们的真正用于处理接收到的消息的ConsumerListener中添加一个receiveMessage方法,添加后其代码如下所示:

public class ConsumerListener {
 
    public void receiveMessage(String message) {
        System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息,消息内容是:" + message);
    }
    
    public void receiveMessage(Email email) {
        System.out.println("接收到一个包含Email的ObjectMessage。");
        System.out.println(email);
    }
    
}

然后我们定义如下测试代码:

@Test
public void testObjectMessage() {
    Email email = new Email("zhangsan@xxx.com", "主题", "内容");
    producerService.sendMessage(adapterQueue, email);
}

因为我们给MessageListenerAdapter指定了一个MessageConverter,而且是一个EmailMessageConverter,所以当MessageListenerAdapter接收到一个消息后,它会调用我们指定的MessageConverter的fromMessage方法把它转换成一个Java对象,根据定义这里会转换成一个Email对象,然后会把这个Email对象作为参数调用我们通过defaultListenerMethod属性指定的默认处理器方法,根据定义这里就是receiveMessage方法,但是我们可以看到在ConsumerListener中我们一共定义了两个receiveMessage方法,因为是通过转换后的Email对象作为参数进行方法调用的,所以这里调用的就应该是参数类型为Email的receiveMessage方法了。上述测试代码运行后会输出如下结果:

img

说到这里可能有读者就会有疑问了,说我们在之前讲解MessageListenerAdapter的时候不是没有指定对应的MessageConverter,然后发送了一个TextMessage,结果Spring还是把它转换成一个String对象,调用了ConsumerListener参数类型为String的receiveMessage方法吗?那你这个MessageConverter在MessageListenerAdapter进行消息接收的时候也没什么用啊。

其实还是有用的,在我们使用MessageListenerAdapter时,在对其进行初始化也就是调用其构造方法时,它会默认new一个Spring已经为我们实现了的MessageConverter——SimpleMessageConverter作为其默认的MessageConverter,这也就是为什么我们在使用MessageListenerAdapter的时候不需要指定MessageConverter但是消息还是会转换成对应的Java对象的原因。所以默认情况下我们使用MessageListenerAdapter时其对应的MessageListener的处理器方法参数类型必须是一个普通Java对象,而不能是对应的Jms Message对象。

那如果我们在处理Jms Message的时候想使用MessageListenerAdapter,然后又希望处理最原始的Message,而不是经过MessageConverter进行转换后的Message该怎么办呢?这个时候我们只需要在定义MessageListenerAdapter的时候指定其MessageConverter为空就可以了。

<!-- 消息监听适配器 -->
<bean id="messageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate">
        <bean class="com.tiantian.springintejms.listener.ConsumerListener"/>
    </property>
    <property name="defaultListenerMethod" value="receiveMessage"/>
    <property name="messageConverter">
        <null/>
    </property>
</bean>

那么这个时候我们的真实MessageListener的处理器方法参数类型就应该是Jms Message或对应的Jms Message子类型了,不然就会调用不到对应的处理方法了。这里因为我们发送的是一个ObjectMessage,所以这里就添加一个对应的参数类型为ObjectMessage的receiveMessage方法了。

public void receiveMessage(ObjectMessage message) throws JMSException {
    System.out.println(message.getObject());
}

刚刚讲到Spring已经为我们实现了一个简单的MessageConverter,即org.springframework.jms.support.converter.SimpleMessageConverter,其实Spring在初始化JmsTemplate的时候也指定了其对应的MessageConverter为一个SimpleMessageConverter,所以如果我们平常没有什么特殊要求的时候可以直接使用JmsTemplate的convertAndSend系列方法进行消息发送,而不必繁琐的在调用send方法时自己new一个MessageCreator进行相应Message的创建。

这里我们也来看一下SimpleMessageConverter的定义,如果觉得它不能满足你的要求,那我们可以对它里面的部分方法进行重写,或者是完全实现自己的MessageConverter。

public class SimpleMessageConverter implements MessageConverter {
 
    public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
        if (object instanceof Message) {
            return (Message) object;
        }
        else if (object instanceof String) {
            return createMessageForString((String) object, session);
        }
        else if (object instanceof byte[]) {
            return createMessageForByteArray((byte[]) object, session);
        }
        else if (object instanceof Map) {
            return createMessageForMap((Map) object, session);
        }
        else if (object instanceof Serializable) {
            return createMessageForSerializable(((Serializable) object), session);
        }

        else {
            throw new MessageConversionException("Cannot convert object of type [" +
                    ObjectUtils.nullSafeClassName(object) + "] to JMS message. Supported message " +
                    "payloads are: String, byte array, Map<String,?>, Serializable object.");
        }
    }
 
    public Object fromMessage(Message message) throws JMSException, MessageConversionException {
        if (message instanceof TextMessage) {
            return extractStringFromMessage((TextMessage) message);
        }
        else if (message instanceof BytesMessage) {
            return extractByteArrayFromMessage((BytesMessage) message);
        }
        else if (message instanceof MapMessage) {
            return extractMapFromMessage((MapMessage) message);
        }
        else if (message instanceof ObjectMessage) {
            return extractSerializableFromMessage((ObjectMessage) message);
        }
        else {
            return message;
        }
    }
 
    protected TextMessage createMessageForString(String text, Session session) throws JMSException {
        return session.createTextMessage(text);
    }
 
    protected BytesMessage createMessageForByteArray(byte[] bytes, Session session) throws JMSException {
        BytesMessage message = session.createBytesMessage();
        message.writeBytes(bytes);
        return message;
    }
 
    protected MapMessage createMessageForMap(Map<?, ?> map, Session session) throws JMSException {
        MapMessage message = session.createMapMessage();
        for (Map.Entry entry : map.entrySet()) {
            if (!(entry.getKey() instanceof String)) {
                throw new MessageConversionException("Cannot convert non-String key of type [" +
                        ObjectUtils.nullSafeClassName(entry.getKey()) + "] to JMS MapMessage entry");
            }
            message.setObject((String) entry.getKey(), entry.getValue());
        }
        return message;
    }
 
    protected ObjectMessage createMessageForSerializable(Serializable object, Session session) throws JMSException {
        return session.createObjectMessage(object);
    }
 
 
    protected String extractStringFromMessage(TextMessage message) throws JMSException {
        return message.getText();
    }
 
    protected byte[] extractByteArrayFromMessage(BytesMessage message) throws JMSException {
        byte[] bytes = new byte[(int) message.getBodyLength()];
        message.readBytes(bytes);
        return bytes;
    }
 
    protected Map extractMapFromMessage(MapMessage message) throws JMSException {
        Map<String, Object> map = new HashMap<String, Object>();
        Enumeration en = message.getMapNames();
        while (en.hasMoreElements()) {
            String key = (String) en.nextElement();
            map.put(key, message.getObject(key));
        }
        return map;
    }
 
    protected Serializable extractSerializableFromMessage(ObjectMessage message) throws JMSException {
        return message.getObject();
    }
 
}

4. 事务管理

Spring提供了一个JmsTransactionManager用于对JMS ConnectionFactory做事务管理。这将允许JMS应用利用Spring的事务管理特性。JmsTransactionManager在执行本地资源事务管理时将从指定的ConnectionFactory绑定一个ConnectionFactory/Session这样的配对到线程中。JmsTemplate会自动检测这样的事务资源,并对它们进行相应操作。

在Java EE环境中,ConnectionFactory会池化Connection和Session,这样这些资源将会在整个事务中被有效地重复利用。在一个独立的环境中,使用Spring的SingleConnectionFactory时所有的事务将公用一个Connection,但是每个事务将保留自己独立的Session。

JmsTemplate可以利用JtaTransactionManager和能够进行分布式的 JMS ConnectionFactory处理分布式事务。

在Spring整合JMS的应用中,如果我们要进行本地的事务管理的话非常简单,只需要在定义对应的消息监听容器时指定其sessionTransacted属性为true,如:

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="sessionTransacted" value="true"/>
</bean> 

该属性值默认为false,这样JMS在进行消息监听的时候就会进行事务控制,当在接收消息时监听器执行失败时JMS就会对接收到的消息进行回滚,对于SessionAwareMessageListener在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

这里我们可以来做一个这样的测试:我们如上配置监听在queueDestination的消息监听容器的sessionTransacted属性为true,然后把我们前面提到的消息监听器ConsumerMessageListener改成这样:

public class ConsumerMessageListener implements MessageListener {
 
    public void onMessage(Message message) {
        //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
        TextMessage textMsg = (TextMessage) message;
        System.out.println("接收到一个纯文本消息。");
        try {
            System.out.println("消息内容是:" + textMsg.getText());
            if (1 == 1) {
                throw new RuntimeException("Error");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
 
}

我们可以看到在上述代码中我们的ConsumerMessageListener在进行消息接收的时候抛出了一个RuntimeException,根据我们上面说的,因为我们已经在对应的监听容器上定义了其sessionTransacted属性为true,所以当这里抛出异常的时候JMS将对接收到的消息进行回滚,即下次进行消息接收的时候该消息仍然能够被接收到。

为了验证这一点,我们先执行一遍测试代码,往queueDestination发送一个文本消息,这个时候ConsumerMessageListener在进行接收的时候将会抛出一个RuntimeException,已经接收到的纯文本消息将进行回滚;接着我们去掉上面代码中抛出异常的语句,即ConsumerMessageListener能够正常的进行消息接收,这个时候我们再运行一次测试代码,往ConsumerMessageListener监听的queueDestination发送一条消息。

如果之前在接手时抛出了异常的那条消息已经回滚了的话,那么这个时候将能够接收到两条消息,控制台将输出接收到的两条消息的内容。具体结果有兴趣的朋友可以自己验证一下。

如果想接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。

要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="transactionManager" ref="jtaTransactionManager"/>
</bean>
    
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

当给消息监听容器指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。 

关于使用JtaTransactionManager来管理上述分布式事务,我们这里也可以来做一个试验。

首先:往Spring配置文件applicationContext.xml中添加如下配置:

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    <property name="dataSource" ref="dataSource"/>
</bean>
 
<jee:jndi-lookup jndi-name="jdbc/mysql" id="dataSource"/>
<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
 
<tx:annotation-driven transaction-manager="jtaTransactionManager"/>  

我们可以看到,在这里我们引入了一个jndi数据源,定义了一个JtaTransactionManager,定义了Spring基于注解的声明式事务管理,定义了一个Spring提供的进行Jdbc操作的工具类jdbcTemplate

接下来把我们的ConsumerMessageListener改为如下形式:

public class ConsumerMessageListener implements MessageListener {
 
    @Autowired
    private TestDao testDao;
    
    private int count = 0;
    
    public void onMessage(Message message) {
            //这里我们知道生产者发送的就是一个纯文本消息,所以这里可以直接进行强制转换,或者直接把onMessage方法的参数改成Message的子类TextMessage
            TextMessage textMsg = (TextMessage) message;
            System.out.println(new Date().toLocaleString() + "接收到一个纯文本消息。");
            try {
                String text = textMsg.getText();
                System.out.println("消息内容是:" + text);
                System.out.println("当前count的值是:" + count);
                testDao.insert(text + count);
                if (count == 0) {
                    count ++;
                    throw new RuntimeException("Error! 出错啦!");
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    }
 
} 

我们可以看到,在ConsumerMessageListener中我们定义了一个实例变量count,其初始值为0;在onMessage里面,我们可以看到我们把接收到的消息内容作为参数调用了testDaoinsert方法;当count值为0,也就是进行第一次消息接收的时候会将count的值加1,同时抛出一个运行时异常。

那么我们这里要测试的就是进行第一次接收的时候testDao已经把相关内容插入数据库了,接着在onMessage里面抛出了一个异常同时count1,我们预期的结果应该是此时数据库进行回滚,同时JMS也回滚,这样JMS将继续尝试接收该消息,此时同样会调用testDaoinsert方法将内容插入数据库,再接着count已经不为0了,所以此时将不再抛出异常,JMS成功进行消息的接收,testDao也成功的将消息内容插入到了数据库。

要证明这个预期我们除了看数据库中插入的数据外,还可以看控制台的输出,正常情况控制台将输出两次消息接收的内容,且第一次时count0,第二次count1

TestDao是一个接口,其TestDaoImpl对insert的方法实现如下: 

@Transactional(readOnly=false)
public void insert(final String name) {
    jdbcTemplate.update("insert into test(name) values(?)", name);
}

这里我们使用支持JtaTransactionManager的Weblogic来进行测试,因为是Web容器,所以我们这里定义了一个Controller来进行消息的发送,具体代码如下:

@Controller
@RequestMapping("test")
public class TestController {
 
    @Autowired
    @Qualifier("queueDestination")
    private Destination destination;
    
    @Autowired
    private ProducerService producerService;
    
    @RequestMapping("first")
    public String first() {
        producerService.sendMessage(destination, "你好,现在是:" + new Date().toLocaleString());
        return "/test/first";
    }
    
}

接下来就是启用Weblogic服务器,进入其控制台,定义一个名叫jdbc/mysql的JNDI数据源,然后把该项目部署到Weblogic服务器上并进行启动。接下来我们就可以访问/test/first.do访问到上述first方法。之后控制台会输出如下信息:

img

我们可以看到当count0时接收了一次,并随后抛出了异常,之后count1又接收了一次,这说明在count0时抛出异常后我们的JMS进行回滚了,那么我们的数据库是否有进行回滚呢?接着我们来看数据库中的内容:

img

我们可以看到数据库表中只有一条记录,而且最后一位表示count的值的为1,这说明在JMS进行消息接收抛出异常时我们的数据库也回滚了。关于使用JtaTransactionManager进行分布式事务管理的问题就说到这里了,有兴趣的朋友可以自己试验一下。

转载

文章转载自:Elim的博客

Last Modified: January 31, 2023
Archives QR Code Tip
QR Code for this page
Tipping QR Code
Leave a Comment

3 Comments
  1. 小白 小白

    @(乖) 来学习下这个

  2. 前段时间做财企接口,刚用完这个,感觉还不错的。

  3. @(滑稽)诈尸!