分布式事务框架分析
事务为什么要分布式
-
什么是事务
◆ 事务指的是一 系列业务操作,只能同时成功或同时失败
◆ 传统事务有4个主要特性:原子性、一致性、隔离性、持久性 -
微服务化带来的挑战
◆ 在传统单体应用中,事务在本地即可完成
◆ 随着后端架构的微服务化,事务无法在本地完成
◆ 所以需要将事务“分布式化” -
传统单体应用
◆ 在传统单体应用中,事务在本地即可完成
-
微服务应用
◆ 随着后端架构的微服务化,事务无法在本地完成
◆ 所以需要将事务"分布式化"
事务的前提理论
-
分布式框架理论 ACID
事务正确执行的四个基本要素
◆ 原子性(Atomicity)
◆ 一致性(Consistency)
◆ 隔离性(Isolation)
◆ 持久性(Durability) -
分布式框架理论 CAP
一致性、可用性、分区容忍性不可能三者兼顾
◆ 一致性(Consistency)
◆ 可用性(Availability)
◆ 分区容忍性 (Partition tolerance) -
分布式框架理论 BASE
由于CAP无法同时满足,基于I程实际,提出了BASE理论
◆ Basically Available (基本可用)
◆ Soft state (软状态)
◆ Eventually consistent (最终一致性)
分布式事务的取舍
◆ ACID往往针对传统本地事务,分布式事务无法满足原子性和隔离性,需要舍弃传统ACID理论
◆ 基于BASE理论,业务状态不需要在微服务系统内强一致
◆ 基于BASE理论,订单状态要做到最终一致性即可
◆ 为了做到最终一致性, 要保证消息不丢失,发送处理的流程要有重试机制,重试多次失败后要有告警
分布式事务框架设计
根据上述分析,分布式事务框架应该包含以下部分
◆ 发送失败重试
◆ 消费失败重试
◆ 死信告警
数据表设计
分布式事务框架搭建
要用到的相关技术:
◆ 声明ConnectionFactory、RabbitAdmin、RabbitListenerContainerFactory、RabbitTemplate
◆ 声明枚举、PO、 开发dao层
◆ 声明定时任务
分布式事务相关说明
-
消息发送失败重试
◆ 发送消息前消息持久化
◆ 发送成功时删除消息
◆ 定时巡检未发送成功消息、重试发送
-
消息消费失败重试
◆ 收到消息时先进行持久化
◆ 消息处理成功,消费端确认(ACK),删除消息
◆ 消息处理失败,延时,不确认消息(NACK),记录次数
◆ 再次处理消息
-
死信消息告警
◆ 声明死信队列、交换机、绑定
◆ 普通队列加入死信设置
◆ 监听到死信,持久化、告警
步骤如下
该消息发送失败重试、消息消费失败重试、死信消息告警的事务框架功能我们写在一个统一的包下面,以便于以后的复用。包名为moodymq。
目录结构如下:
开发为在RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配的源码基础上新增代码。
1. 新建数据表
DROP TABLE IF EXISTS `trans_message`; CREATE TABLE `trans_message` ( `id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息ID', `service` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '服务名称', `type` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消息类型', `exchange` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '交换机', `routing_Key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '路由键', `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '队列', `sequence` int(11) NULL DEFAULT NULL COMMENT '序号', `payload` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '消息内容', `date` datetime(0) NULL DEFAULT NULL COMMENT '时间', PRIMARY KEY (`id`, `service`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 修改配置文件
application.properties增加moodymq
#订单微服务配置类 server.port=8080 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driver-class-name=com.mysql.jdbc.Driver rabbitmq.host=192.168.137.138 rabbitmq.port=5672 rabbitmq.username=guest rabbitmq.password=guest rabbitmq.exchange=exchange.food spring.rabbitmq.addresses=192.168.137.138 spring.rabbitmq.host=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 自动ack spring.rabbitmq.listener.direct.acknowledge-mode=auto moodymq.service=orderService moodymq.host=192.168.137.138 moodymq.port=5672 moodymq.username=guest moodymq.password=guest moodymq.vhost=/ # 重复消费最多五次 moodymq.resendTimes=5 # 重复消费间隔时长 moodymq.resendFreq=5000
3. 新建状态枚举、Po和Dao层
TransMessageType.java
package cn.kt.food.orderservicemanager.moodymq.enummeration; public enum TransMessageType { SEND, RECEIVE, DEAD; }
TransMessagePO.java
package cn.kt.food.orderservicemanager.moodymq.po; import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType; import lombok.Getter; import lombok.Setter; import lombok.ToString; import java.util.Date; @Getter @Setter @ToString public class TransMessagePO { private String id; private String service; private TransMessageType type; private String exchange; private String routingKey; private String queue; private Integer sequence; private String payload; private Date date; }
TransMessageDao.java
package cn.kt.food.orderservicemanager.moodymq.dao; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; @Mapper @Repository public interface TransMessageDao { @Insert("INSERT INTO trans_message (id, type, service, " + "exchange, routing_key, queue, sequence, payload," + "date) " + "VALUES(#{id}, #{type}, #{service},#{exchange}," + "#{routingKey},#{queue},#{sequence}, #{payload},#{date})") void insert(TransMessagePO transMessagePO); @Update("UPDATE trans_message set type=#{type}, " + "service=#{service}, exchange =#{exchange},"+ "routing_key =#{routingKey}, queue =#{queue}, " + "sequence =#{sequence}, payload =#{payload}, " + "date =#{date} " + "where id=#{id} and service=#{service}") void update(TransMessagePO transMessagePO); @Select("SELECT id, type, service, exchange, " + "routing_key routingKey, queue, sequence, " + "payload, date " + "FROM trans_message " + "where id=#{id} and service=#{service}") TransMessagePO selectByIdAndService(@Param("id") String id, @Param("service") String service); @Select("SELECT id, type, service, exchange, " + "routing_key routingKey, queue, sequence, " + "payload, date " + "FROM trans_message " + "WHERE type = #{type} and service = #{service}") List<TransMessagePO> selectByTypeAndService( @Param("type") String type, @Param("service") String service); @Delete("DELETE FROM trans_message " + "where id=#{id} and service=#{service}") void delete(@Param("id") String id, @Param("service") String service); }
4. 发送消息封装send
TransMessageSender.java
package cn.kt.food.orderservicemanager.moodymq.sender; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class TransMessageSender { @Autowired RabbitTemplate rabbitTemplate; @Autowired TransMessageService transMessageService; // 发送消息封装 public void send(String exchange, String routingKey, Object payload) { log.info("send(): exchange:{} routingKey:{} payload:{}", exchange, routingKey, payload); try { ObjectMapper mapper = new ObjectMapper(); String payloadStr = mapper.writeValueAsString(payload); System.out.println(payloadStr); // 发送前暂存消息 TransMessagePO transMessagePO = transMessageService.messageSendReady( exchange, routingKey, payloadStr ); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); // 封装消息 Message message = new Message(payloadStr.getBytes(), messageProperties); message.getMessageProperties().setMessageId(transMessagePO.getId()); // 发送消息 rabbitTemplate.convertAndSend(exchange, routingKey, message, new CorrelationData(transMessagePO.getId())); log.info("message sent, ID:{}", transMessagePO.getId()); } catch (Exception e) { log.error(e.getMessage(), e); } } }
5. 新建service层和实现类
TransMessageService.java
package cn.kt.food.orderservicemanager.moodymq.service; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import java.util.List; public interface TransMessageService { /** * 发送前暂存消息 * * @param exchange exchange * @param routingKey routingKey * @param body body * @return TransMessagePO */ TransMessagePO messageSendReady(String exchange, String routingKey, String body); /** * 设置消息发送成功,需要把消息删除 * * @param id 消息ID */ void messageSendSuccess(String id); /** * 设置消息返回,将消息持久化 * * @param id id * @param exchange exchange * @param routingKey routingKey * @param body body * @return TransMessagePO */ TransMessagePO messageSendReturn( String id, String exchange, String routingKey, String body); /** * 查询应发未发消息(之前发送出错的消息,还需要重发,未到告警) * * @return List<TransMessagePO> */ List<TransMessagePO> listReadyMessages(); /** * 记录消息发送次数 * * @param id id */ void messageResend(String id); /** * 消息重发多次,放弃 * * @param id id */ void messageDead(String id); /** * 保存监听到的死信消息 * @param id * @param exchange * @param routingKey * @param queue * @param body */ void messageDead(String id, String exchange, String routingKey, String queue, String body); /** * 消息消费前保存 * * @param id * @param exchange * @param routingKey * @param queue * @param body * @return */ TransMessagePO messageReceiveReady( String id, String exchange, String routingKey, String queue, String body); /** * 消息消费成功 * * @param id */ void messageReceiveSuccess(String id); }
TransMessageServiceImpl.java
package cn.kt.food.orderservicemanager.moodymq.service; import cn.kt.food.orderservicemanager.moodymq.dao.TransMessageDao; import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.Date; import java.util.List; import java.util.UUID; @Service public class TransMessageServiceImpl implements TransMessageService { @Autowired TransMessageDao transMessageDao; @Value("${moodymq.service}") String serviceName; @Override public TransMessagePO messageSendReady(String exchange, String routingKey, String body) { final String messageId = UUID.randomUUID().toString(); TransMessagePO transMessagePO = new TransMessagePO(); transMessagePO.setId(messageId); transMessagePO.setService(serviceName); transMessagePO.setExchange(exchange); transMessagePO.setRoutingKey(routingKey); transMessagePO.setPayload(body); transMessagePO.setDate(new Date()); transMessagePO.setSequence(0); transMessagePO.setType(TransMessageType.SEND); transMessageDao.insert(transMessagePO); return transMessagePO; } @Override public void messageSendSuccess(String id) { transMessageDao.delete(id, serviceName); } @Override public TransMessagePO messageSendReturn(String id, String exchange, String routingKey, String body) { TransMessagePO selectByIdAndService = transMessageDao.selectByIdAndService(id, serviceName); if (selectByIdAndService == null) { TransMessagePO transMessagePO = new TransMessagePO(); transMessagePO.setId(id); transMessagePO.setService(serviceName); transMessagePO.setExchange(exchange); transMessagePO.setRoutingKey(routingKey); transMessagePO.setPayload(body); transMessagePO.setDate(new Date()); transMessagePO.setSequence(0); transMessagePO.setType(TransMessageType.SEND); transMessageDao.insert(transMessagePO); return transMessagePO; } else { return selectByIdAndService; } // return messageSendReady(exchange, routingKey, body); } @Override public List<TransMessagePO> listReadyMessages() { return transMessageDao.selectByTypeAndService( TransMessageType.SEND.toString(), serviceName ); } @Override public void messageResend(String id) { TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName); transMessagePO.setSequence(transMessagePO.getSequence() + 1); transMessageDao.update(transMessagePO); } @Override public void messageDead(String id) { TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName); transMessagePO.setType(TransMessageType.DEAD); transMessageDao.update(transMessagePO); } @Override public void messageDead(String id, String exchange, String routingKey, String queue, String body) { TransMessagePO transMessagePO = new TransMessagePO(); transMessagePO.setId(id); transMessagePO.setService(serviceName); transMessagePO.setExchange(exchange); transMessagePO.setRoutingKey(routingKey); transMessagePO.setQueue(queue); transMessagePO.setPayload(body); transMessagePO.setDate(new Date()); transMessagePO.setSequence(0); transMessagePO.setType(TransMessageType.DEAD); transMessageDao.insert(transMessagePO); } @Override public TransMessagePO messageReceiveReady( String id, String exchange, String routingKey, String queue, String body) { TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName); if (null == transMessagePO) { // 说明是第一次消费 transMessagePO = new TransMessagePO(); transMessagePO.setId(id); transMessagePO.setService(serviceName); transMessagePO.setExchange(exchange); transMessagePO.setRoutingKey(routingKey); transMessagePO.setQueue(queue); transMessagePO.setPayload(body); transMessagePO.setDate(new Date()); transMessagePO.setSequence(0); transMessagePO.setType(TransMessageType.RECEIVE); transMessageDao.insert(transMessagePO); } else { // 否则消费次数 + 1 transMessagePO.setSequence(transMessagePO.getSequence() + 1); transMessageDao.update(transMessagePO); } return transMessagePO; } @Override public void messageReceiveSuccess(String id) { // 消费成功后删除消息 transMessageDao.delete(id, serviceName); } }
6. 新建config配置RabbitAdmin和RabbitTemplate实现消息的监听和确认逻辑
消息监听使用手动ack
消息确认机制消息投递至交换机失败进行消息重发
MoodyRabbitConfig.java
package cn.kt.food.orderservicemanager.moodymq.config; import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class MoodyRabbitConfig { @Autowired TransMessageService transMessageService; @Value("${moodymq.host}") String host; @Value("${moodymq.port}") int port; @Value("${moodymq.username}") String username; @Value("${moodymq.password}") String password; @Value("${moodymq.vhost}") String vhost; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); // CORRELATED:发送消息的成功还是失败需要有id的参数,因为确认消息是异步的,需要确认哪条消息被确认, // 体现在发送消息前持久化时设置id:message.getMessageProperties().setMessageId(transMessagePO.getId()); connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED); // 消息无法路由的时候需要设置消息返回 connectionFactory.setPublisherReturns(true); connectionFactory.createConnection(); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin admin = new RabbitAdmin(connectionFactory); admin.setAutoStartup(true); return admin; } /* 配置消费端消息监听 */ @Bean public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setConcurrentConsumers(3); factory.setMaxConcurrentConsumers(10); // 手动ack factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } /* 消息是否路由的消息确认机制 */ @Bean public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // 消息确认回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { log.info("correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause); // 如果消息确认接收 if (ack && null != correlationData) { String messageId = correlationData.getId(); log.info("消息已经正确投递到交换机, id:{}", messageId); transMessageService.messageSendSuccess(messageId); } else { // 如果消息确认接收失败,则消息保留,等待下次重发 log.error("消息投递至交换机失败,correlationData:{}", correlationData); } }); // 当消息未进入队列时回调 rabbitTemplate.setReturnsCallback(returnedMessage -> { log.error("消息无法路由!message:{}, replyCode:{} replyText:{} exchange:{} routingKey:{}", returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey()); transMessageService.messageSendReturn( returnedMessage.getMessage().getMessageProperties().getMessageId(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), new String(returnedMessage.getMessage().getBody()) ); }); return rabbitTemplate; } }
7. 新建抽象类实现ChannelAwareMessageListener完成消息监听
监听消息的接收和业务执行是否异常,如果消息处理异常,则消息重回队列
AbstractMessageListener.java
package cn.kt.food.orderservicemanager.moodymq.listener; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService; import com.fasterxml.jackson.core.JsonProcessingException; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import java.io.IOException; @Slf4j public abstract class AbstractMessageListener implements ChannelAwareMessageListener { @Autowired TransMessageService transMessageService; @Value("${moodymq.resendTimes}") Integer resendTimes; public abstract void receviceMessage(Message message) throws JsonProcessingException; @Override public void onMessage(Message message, Channel channel) throws IOException, InterruptedException { MessageProperties messageProperties = message.getMessageProperties(); // deliveryTag:跟消息接收确认有关的数字 long deliveryTag = messageProperties.getDeliveryTag(); // 持久化接收到的消息 log.info("收到的消息{}", new String(message.getBody())); TransMessagePO transMessagePO = transMessageService.messageReceiveReady( messageProperties.getMessageId(), messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), messageProperties.getConsumerQueue(), new String(message.getBody()) ); log.info("收到消息{}, 消费次数{}", messageProperties.getMessageId(), transMessagePO.getSequence()); try { // 该方法让业务去执行,这里抓异常 receviceMessage(message); // 消息处理完成 channel.basicAck(deliveryTag, false); transMessageService.messageReceiveSuccess(messageProperties.getMessageId()); } catch (Exception e) { // 消息处理异常 log.error(e.getMessage(), e); // 判断该消息的消费次数 if (transMessagePO.getSequence() >= resendTimes) { // 消费次数超限,拒收消息 channel.basicReject(deliveryTag, false); } else { // 消息重回队列 Thread.sleep((long) (Math.pow(2, transMessagePO.getSequence())) * 1000); channel.basicNack(deliveryTag, false, true); } } } }
8. 配置死信消息告警
声明死信交换机、队列和绑定
DlxConfig.java
package cn.kt.food.orderservicemanager.moodymq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConditionalOnProperty("moodymq.dlxEnabled") public class DlxConfig { /* * 声明死信交换机、队列和绑定 */ @Bean public TopicExchange dlxExchange() { return new TopicExchange("exchange.dlx"); } @Bean public Queue dlxQueue() { return new Queue("queue.dlx", true, false, false); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#"); } }
监听死信消息
DlxListener.java
package cn.kt.food.orderservicemanager.moodymq.listener; import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @Component @Slf4j //配置是不是需要监听死信:@ConditionalOnProperty实现是通过havingValue与配置文件中的值对比,返回为true则配置类生效,反之失效. @ConditionalOnProperty("moodymq.dlxEnabled") public class DlxListener implements ChannelAwareMessageListener { @Autowired TransMessageService transMessageService; @Override public void onMessage(Message message, Channel channel) throws Exception { String messageBody = new String(message.getBody()); log.error("dead letter! message:{}", message); //发邮件、打电话、发短信 //XXXXX() MessageProperties messageProperties = message.getMessageProperties(); transMessageService.messageDead( messageProperties.getMessageId(), messageProperties.getReceivedExchange(), messageProperties.getReceivedRoutingKey(), messageProperties.getConsumerQueue(), messageBody ); // 单条确认 channel.basicAck(messageProperties.getDeliveryTag(), false); } }
9. 配置定时任务
每隔5秒巡检异常消息
ResendTask.java
package cn.kt.food.orderservicemanager.moodymq.task; import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType; import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO; import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; @EnableScheduling @Configuration @Component @Slf4j public class ResendTask { @Autowired TransMessageService transMessageService; @Autowired RabbitTemplate rabbitTemplate; @Value("${moodymq.resendTimes}") Integer resendTimes; // 在配置类中取出来 @Scheduled(fixedDelayString = "${moodymq.resendFreq}") public void resendMessage(){ log.info("resendMessage() invoked."); List<TransMessagePO> messagePOS = transMessageService.listReadyMessages(); log.info("resendMessage(): messagepos:{}", messagePOS); for (TransMessagePO po: messagePOS) { log.info("resendMessage(): po:{}", po); // 过滤dead消息 if(po.getSequence() > resendTimes){ log.error("resend too many times!"); transMessageService.messageDead(po.getId()); continue; } // 封装和发送消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); Message message = new Message(po.getPayload().getBytes(), messageProperties); message.getMessageProperties().setMessageId(po.getId()); rabbitTemplate.convertAndSend( po.getExchange(), po.getRoutingKey(), message, new CorrelationData(po.getId())); log.info("message sent, ID:{}", po.getId()); // 消息重发,发送次数+1 transMessageService.messageResend(po.getId()); } } }
10. 改造moddymq包外的业务代码
- 继承moddymq包中抽象出来的监听方法
- 注解绑定交换机队列配置改用RabbitConfig配置类中使用@Bean配置
- 监听消息的handMessage改用抽象类的方法receviceMessage进行重写
- 在moddymq包外的业务代码中使用包内封装的发送方法
具体实现详情看下面源码
源码下载
https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_3
小结
- 消息发送失败重试,消息消费失败重试,死信消息告警只是有效的保证rabbitMQ消息的事务一致性,有效的解决了消息失败的结果。
- 在实际项目中可以把开发的分布式事务框架包moddymq新建另外一个项目,并打成jar包,统一使用规范供多微服务模块使用
- 本moddymq中并无注明给死信队列queue.dlx发送消息的场景,实际开发中可以定时将状态为DEAD的消息发送至死信队列进行死信告警。告警方法方法已给出,但具体告警逻辑可以根据实际场景需要进行完善。
- 在源代码中,沿用了RabbitMQ快速上手中的订单微服务的案例,改造使用了该分布式的事务框架。