activemq 消息消费失败之后如何重新消费

首先给大家推荐一下我老师大神的人工智能教学网站。教学不仅零基础,通俗易懂,而且非常风趣幽默,还时不时有内涵黄段子!点这里可以跳转到网站

在不开启事物的情况下 采用的是应答模式4(ActiveMQSession.AUTO_ACKNOWLEDGE)消费一次 应答一次

这时候消费失败了,由于没有配置死亡队列,消息就不会被消费堆积在队列中,那么怎么才可以让消息再被消费呢?

由于项目中的应用场景,有个方案启动和停止的功能,项目启动启动监听,项目停止,停止监听

具体实现代码如下

public class MqService { 	private JmsTemplate jmsTemplate; 	private CachingConnectionFactory cachingProductConnectionFactory; 	private CachingConnectionFactory cachingConsumersConnectionFactory; 	private static final String ACTIVEMQQUEUE_OPTIMIZATION = "";// 性能优化参数																// consumer.prefetchSize预加载消息																// 20条																// ?consumer.prefetchSize=20 	/**	 * 发送消息	 * 	 * @param ryzhMessage	 */	public void send(final RyzhMessage ryzhMessage) {				// 得到MQ工具类		RyzhMqHolder hodler = mqHolders.get(ryzhMessage.getShcemeId());		// 发送信息		jmsTemplate.setConnectionFactory(cachingProductConnectionFactory);		jmsTemplate.send(hodler.getDestination(), new MessageCreator() {			public Message createMessage(Session session) throws JMSException {				ObjectMessage objectMessage = session.createObjectMessage();				objectMessage.setObject(ryzhMessage);				return objectMessage;			}		});	} 	/**	 * 监听	 * 	 * @param schemeId	 * @param receiver	 */	public void startListen(RyzhScheme scheme) {		RyzhMqHolder holder = mqHolders.get(scheme.getSchemeId());		try {			if (holder == null) {				RyzhConsumer consumer = (RyzhConsumer) SpringBeanUtils.getBean("ryzhConsumer");				consumer.setScheme(scheme);				logger.info("RyzhMqService-启动消息队列监听器 schemeId=" + scheme.getSchemeId());				// 如果是为空				holder = new RyzhMqHolder();				// 得到整合方案的配置				TRyzhFa tRyzhFa = ((RyzhConsumer) consumer).getScheme().getZhfa();				// 得到消息队列配置的信息				String xxdlJson = tRyzhFa.getXxdlpz();				// json转map				RyzhMqConfig ryzhMqConfig = JSON.parseObject(xxdlJson, RyzhMqConfig.class);				// 设置方案ID				holder.setSchemeId(scheme.getSchemeId());				// 设置目的地				ActiveMQQueue destination = new ActiveMQQueue(RyzhMqConfig.QUEUE_NAME + scheme.getSchemeId() + ACTIVEMQQUEUE_OPTIMIZATION);				holder.setDestination(destination);				// 创建监听器				DefaultMessageListener listener = new DefaultMessageListener();				// 给监听器设置消费者				listener.setReceiver(consumer);				// 将监听器保存在holder中				holder.setListener(listener);				// 创建监听容器				DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();				// 监听容器属性的配置				listenerContainer.setConnectionFactory(cachingConsumersConnectionFactory);				// 设置目的地				listenerContainer.setDestination(destination);				// 设置监听器				listenerContainer.setMessageListener(listener);				// 设置消费者集群数				listenerContainer.setConcurrentConsumers(ryzhMqConfig.getConcurrentConsumers());				// 设置监听队列还是主题 默认是队列				listenerContainer.setPubSubDomain(RyzhMqConfig.PUBSUB_DOMAIN);				listenerContainer.setPubSubNoLocal(false);				// listenerContainer.setAcceptMessagesWhileStopping(true);				// 设置应答模式 默认是4				listenerContainer.setSessionAcknowledgeMode(RyzhMqConfig.SESSION_ACKNOWLEDGEMODE);				// 设置是否启动事物 默认不开启				listenerContainer.setSessionTransacted(RyzhMqConfig.SESSION_TRANSACTED);				// 将监听容器保存在holder中				holder.setListenerContainer(listenerContainer);				// 将holder缓存在map中				mqHolders.put(scheme.getSchemeId(), holder);				// 初始化容器				holder.getListenerContainer().initialize();				// 启动监听				holder.getListenerContainer().start();				logger.info("RyzhMqService-消息队列监听器启动成功 schemeId=" + scheme.getSchemeId());			}		} catch (Exception e) {			logger.error("RyzhMqService-消息队列监听器启动失败 schemeId=" + scheme.getSchemeId(), e);		}	} 	public void stopListen(RyzhScheme scheme) {		RyzhMqHolder ryzhMqHolder = mqHolders.get(scheme.getSchemeId());		// 停止监听		ryzhMqHolder.getListenerContainer().destroy();		// 移除缓存		mqHolders.remove(scheme.getSchemeId());	} 	/**	 * 取得MQHolder	 * 	 * @param ryzhMessage	 * @return	 */ 	public Map<String, RyzhMqHolder> getMqHolders() {		return mqHolders;	} 	public void setMqHolders(Map<String, RyzhMqHolder> mqHolders) {		this.mqHolders = mqHolders;	} 	public JmsTemplate getJmsTemplate() {		return jmsTemplate;	} 	public void setJmsTemplate(JmsTemplate jmsTemplate) {		this.jmsTemplate = jmsTemplate;	} 	public CachingConnectionFactory getCachingProductConnectionFactory() {		return cachingProductConnectionFactory;	} 	public void setCachingProductConnectionFactory(CachingConnectionFactory cachingProductConnectionFactory) {		this.cachingProductConnectionFactory = cachingProductConnectionFactory;	} 	public CachingConnectionFactory getCachingConsumersConnectionFactory() {		return cachingConsumersConnectionFactory;	} 	public void setCachingConsumersConnectionFactory(CachingConnectionFactory cachingConsumersConnectionFactory) {		this.cachingConsumersConnectionFactory = cachingConsumersConnectionFactory;	} }

具体的xml配置如下

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"	xmlns:context="http://www.springframework.org/schema/context"	xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:jee="http://www.springframework.org/schema/jee"	xmlns:tx="http://www.springframework.org/schema/tx" xmlns:jpa="http://www.springframework.org/schema/data/jpa"	xmlns:lang="http://www.springframework.org/schema/lang"	xsi:schemaLocation="	   http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd		http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd		http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"	default-autowire="byName"> 	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">		<property name="connectionFactory" ref="cachingProductConnectionFactory"></property>		<!-- 持久化 -->		<property name="deliveryMode" value="2"></property>		<!-- 不开启事物 -->		<property name="sessionTransacted" value="false"></property>		<!-- 应答模式是 INDIVIDUAL_ACKNOWLEDGE -->		<property name="sessionAcknowledgeMode" value="4"></property>	</bean> 	<bean id="ryzhMqService" class="com.dragonsoft.rygl.mq.RyzhMqService"		scope="singleton">		<property name="cachingConsumersConnectionFactory" ref="cachingConsumersConnectionFactory"></property>		<property name="cachingProductConnectionFactory" ref="cachingProductConnectionFactory"></property>		<property name="jmsTemplate" ref="jmsTemplate" />	</bean> 	<bean id="defaultRyzhProduceProcessor"		class="com.dragonsoft.rygl.ryzh.service.produce.DefaultRyzhProduceProcessor"		scope="prototype">		<property name="mgService" ref="ryzhMqService"></property>		<property name="jdbcTemplate" ref="jdbcTemplate"></property>		<property name="ip" value="${jmx.ip}"></property>		<property name="port" value="${jmx.port}"></property>		<property name="dataSourceManager" ref="dataSourceManager"></property>	</bean>	<!-- 生产者专用缓存池 -->	<bean id="cachingProductConnectionFactory"		class="org.springframework.jms.connection.CachingConnectionFactory">		<property name="targetConnectionFactory" ref="connectionFactory"></property>		<property name="reconnectOnException" value="true"></property>		<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>	</bean>	<!-- 消费者专用缓存池 -->	<bean id="cachingConsumersConnectionFactory"		class="org.springframework.jms.connection.CachingConnectionFactory">		<property name="targetConnectionFactory" ref="connectionFactory"></property>		<property name="reconnectOnException" value="true"></property>		<property name="sessionCacheSize" value="${jms.sessionCacheSize}"></property>		<span style="color:#ff0000;"><property name="cacheConsumers" value="false"></property>		<property name="cacheProducers" value="false"></property></span>	</bean>	<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">		<property name="brokerURL" value="${jms.brokerURL}" />		<property name="userName" value="${jms.userName}"></property>		<property name="password" value="${jms.password}"></property>		<property name="useAsyncSend" value="${jms.useAsyncSend}"></property>
<span style="white-space:pre">		</span><!--		<property name="redeliveryPolicy" >			<bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">				<property name="useExponentialBackOff" value="true"/>				<property name="maximumRedeliveries" value="5"/><!-- 重发次数 5次 -->				<property name="initialRedeliveryDelay" value="100"/><!-- 重发间隔时间100毫秒 -->				<property name="backOffMultiplier" value="2.0"/>			</bean>		</property><pre name="code" class="html"><span style="white-space:pre">		</span>-->

</bean></beans>
注意红色部分,需要缓存,否则监听会被缓存无法stop。

以下就可以达到不用配置死信队列就可以重新消费信息。(其实原理就是重启了监听容器)

点这里可以跳转到人工智能网站

发表评论