1,activemq 分为 queue和topic 两种
2,下面先介绍queue ,使用spring 集成
build.gradle
compile "org.springframework:spring-jms:3.2.1.RELEASE" compile "org.apache.activemq:activemq-core:5.7.0" compile "ch.qos.logback:logback-core:1.0.9" compile "ch.qos.logback:logback-classic:1.0.9" compile "ch.qos.logback:logback-access:1.0.9" testCompile group: 'junit', name: 'junit', version: '4.10'
applicationContext.xml
<description>Spring公共配置</description> <!-- 使用annotation 自动注册bean,并检查@Required,@Autowired的属性已被注入 --> <context:component-scan base-package="net.tt64"/> <!-- ActiveMQ 连接工厂 --> <bean id="advancedConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://cheyoushuo" /> <property name="useAsyncSend" value="true" /> </bean> <!-- Spring Caching 连接工厂 --> <bean id="advancedCachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="advancedConnectionFactory" /> <property name="sessionCacheSize" value="10" /> </bean> <!-- Queue定义 --> <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="test.jms" /> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="advancedCachingConnectionFactory" /> <!-- 使 deliveryMode, priority, timeToLive设置生效--> <property name="explicitQosEnabled" value="true" /> <!-- 设置NON_PERSISTENT模式, 默认为PERSISTENT --> <property name="deliveryPersistent" value="true" /> <!-- 设置优先级, 默认为4 --> <property name="priority" value="9" /> </bean> <!-- 异步接收Queue消息Container --> <bean id="advancedQueueContainer" depends-on="jmsService" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="advancedConnectionFactory" /> <property name="destination" ref="notifyQueue" /> <property name="messageListener" ref="jmsService" /> <!-- 初始5个Consumer, 可动态扩展到10 --> <property name="concurrentConsumers" value="1" /> <property name="maxConcurrentConsumers" value="1" /> <!-- 设置消息确认模式为Client --> <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" /> </bean>
jms sender
@Autowired private JmsTemplate jmsTemplate; @Autowired private Destination dest; public void send(final Serializable obj) { jmsTemplate.send(dest, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { ObjectMessage msg = session.createObjectMessage(obj); return msg; } }); }jms receiver
@Override public void onMessage(Message message) { if (message instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) message; try { System.out.println(om.getObject().toString() + " recieved"); Thread.currentThread().sleep(1000); System.out.println(om.getObject().toString() + " process over"); } catch (InterruptedException ex) { } catch (JMSException ex) { } } }Main
public static void main(String[] args) { ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml"); JmsSender sender = ctx.getBean(JmsSender.class); for (int i = 0; i < 100; i++) { System.out.println("message " + i + "send"); sender.send("message" + i); } }
这里将 concurrentConsumers 调为 1, 这就是单任务队列