RabbitMQ

RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架

Nick · 6月7日 · 2022年 · 本文22166字 · 阅读56分钟342

分布式事务框架分析

事务为什么要分布式

  1. 什么是事务
    ◆ 事务指的是一 系列业务操作,只能同时成功或同时失败
    ◆ 传统事务有4个主要特性:原子性、一致性、隔离性、持久性

  2. 微服务化带来的挑战
    ◆ 在传统单体应用中,事务在本地即可完成
    ◆ 随着后端架构的微服务化,事务无法在本地完成
    ◆ 所以需要将事务“分布式化”

  3. 传统单体应用
    ◆ 在传统单体应用中,事务在本地即可完成
    RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

  4. 微服务应用
    ◆ 随着后端架构的微服务化,事务无法在本地完成
    RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

◆ 所以需要将事务"分布式化"
RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

事务的前提理论

  1. 分布式框架理论 ACID
    事务正确执行的四个基本要素
    ◆ 原子性(Atomicity)
    ◆ 一致性(Consistency)
    ◆ 隔离性(Isolation)
    ◆ 持久性(Durability)

  2. 分布式框架理论 CAP
    一致性、可用性、分区容忍性不可能三者兼顾
    ◆ 一致性(Consistency)
    ◆ 可用性(Availability)
    ◆ 分区容忍性 (Partition tolerance)

  3. 分布式框架理论 BASE
    由于CAP无法同时满足,基于I程实际,提出了BASE理论
    ◆ Basically Available (基本可用)
    ◆ Soft state (软状态)
    ◆ Eventually consistent (最终一致性)

分布式事务的取舍

◆ ACID往往针对传统本地事务,分布式事务无法满足原子性和隔离性,需要舍弃传统ACID理论
◆ 基于BASE理论,业务状态不需要在微服务系统内强一致
◆ 基于BASE理论,订单状态要做到最终一致性即可
◆ 为了做到最终一致性, 要保证消息不丢失,发送处理的流程要有重试机制,重试多次失败后要有告警

分布式事务框架设计

根据上述分析,分布式事务框架应该包含以下部分
◆ 发送失败重试
◆ 消费失败重试
◆ 死信告警

数据表设计
RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

分布式事务框架搭建

要用到的相关技术:
◆ 声明ConnectionFactory、RabbitAdmin、RabbitListenerContainerFactory、RabbitTemplate
◆ 声明枚举、PO、 开发dao层
◆ 声明定时任务

分布式事务相关说明

  1. 消息发送失败重试
    ◆ 发送消息前消息持久化
    ◆ 发送成功时删除消息
    ◆ 定时巡检未发送成功消息、重试发送
    RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

  2. 消息消费失败重试
    ◆ 收到消息时先进行持久化
    ◆ 消息处理成功,消费端确认(ACK),删除消息
    ◆ 消息处理失败,延时,不确认消息(NACK),记录次数
    ◆ 再次处理消息
    RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

  3. 死信消息告警
    ◆ 声明死信队列、交换机、绑定
    ◆ 普通队列加入死信设置
    ◆ 监听到死信,持久化、告警
    RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客

步骤如下

该消息发送失败重试、消息消费失败重试、死信消息告警的事务框架功能我们写在一个统一的包下面,以便于以后的复用。包名为moodymq。
目录结构如下:
RabbitMQ学习笔记(七)——RabbitMQ分布式事务框架-左眼会陪右眼哭の博客
开发为在RabbitMQ学习笔记(四)——RabbitMQ与SpringBoot适配的源码基础上新增代码。

1. 新建数据表

DROP TABLE IF EXISTS `trans_message`;
CREATE TABLE `trans_message`  (
  `id` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '消息ID',
  `service` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '服务名称',
  `type` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '消息类型',
  `exchange` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '交换机',
  `routing_Key` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '路由键',
  `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '队列',
  `sequence` int(11) NULL DEFAULT NULL COMMENT '序号',
  `payload` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '消息内容',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`, `service`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

2. 修改配置文件

application.properties增加moodymq

#订单微服务配置类
server.port=8080
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
rabbitmq.host=192.168.137.138
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.exchange=exchange.food
spring.rabbitmq.addresses=192.168.137.138
spring.rabbitmq.host=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 自动ack
spring.rabbitmq.listener.direct.acknowledge-mode=auto

moodymq.service=orderService
moodymq.host=192.168.137.138
moodymq.port=5672
moodymq.username=guest
moodymq.password=guest
moodymq.vhost=/
# 重复消费最多五次
moodymq.resendTimes=5
# 重复消费间隔时长
moodymq.resendFreq=5000

3. 新建状态枚举、Po和Dao层

TransMessageType.java

package cn.kt.food.orderservicemanager.moodymq.enummeration;
public enum TransMessageType {
    SEND,
    RECEIVE,
    DEAD;
}

TransMessagePO.java

package cn.kt.food.orderservicemanager.moodymq.po;

import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.Date;

@Getter
@Setter
@ToString
public class TransMessagePO {
    private String id;
    private String service;
    private TransMessageType type;
    private String exchange;
    private String routingKey;
    private String queue;
    private Integer sequence;
    private String payload;
    private Date date;
}

TransMessageDao.java

package cn.kt.food.orderservicemanager.moodymq.dao;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;

@Mapper
@Repository
public interface TransMessageDao {

    @Insert("INSERT INTO trans_message (id, type, service, " +
            "exchange, routing_key, queue, sequence, payload," +
            "date) " +
            "VALUES(#{id}, #{type}, #{service},#{exchange}," +
            "#{routingKey},#{queue},#{sequence}, #{payload},#{date})")
    void insert(TransMessagePO transMessagePO);

    @Update("UPDATE trans_message set type=#{type}, " +
            "service=#{service}, exchange =#{exchange},"+
            "routing_key =#{routingKey}, queue =#{queue}, " +
            "sequence =#{sequence}, payload =#{payload}, " +
            "date =#{date} " +
            "where id=#{id} and service=#{service}")
    void update(TransMessagePO transMessagePO);

    @Select("SELECT id, type, service, exchange, " +
            "routing_key routingKey, queue, sequence, " +
            "payload, date " +
            "FROM trans_message " +
            "where id=#{id} and service=#{service}")
    TransMessagePO selectByIdAndService(@Param("id") String id,
                                        @Param("service") String service);

    @Select("SELECT id, type, service, exchange, " +
            "routing_key routingKey, queue, sequence, " +
            "payload, date " +
            "FROM trans_message " +
            "WHERE type = #{type} and service = #{service}")
    List<TransMessagePO> selectByTypeAndService(
            @Param("type") String type,
            @Param("service") String service);

    @Delete("DELETE FROM trans_message " +
            "where id=#{id} and service=#{service}")
    void delete(@Param("id") String id,
                @Param("service") String service);
}

4. 发送消息封装send

TransMessageSender.java

package cn.kt.food.orderservicemanager.moodymq.sender;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TransMessageSender {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Autowired
    TransMessageService transMessageService;

    // 发送消息封装
    public void send(String exchange, String routingKey, Object payload) {
        log.info("send(): exchange:{} routingKey:{} payload:{}",
                exchange, routingKey, payload);
        try {
            ObjectMapper mapper = new ObjectMapper();
            String payloadStr = mapper.writeValueAsString(payload);
            System.out.println(payloadStr);
            // 发送前暂存消息
            TransMessagePO transMessagePO =
                    transMessageService.messageSendReady(
                            exchange,
                            routingKey,
                            payloadStr
                    );

            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            // 封装消息
            Message message = new Message(payloadStr.getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(transMessagePO.getId());
            // 发送消息
            rabbitTemplate.convertAndSend(exchange, routingKey, message,
                    new CorrelationData(transMessagePO.getId()));

            log.info("message sent, ID:{}", transMessagePO.getId());

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
}

5. 新建service层和实现类

TransMessageService.java

package cn.kt.food.orderservicemanager.moodymq.service;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import java.util.List;

public interface TransMessageService {
    /**
     * 发送前暂存消息
     *
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param body       body
     * @return TransMessagePO
     */
    TransMessagePO messageSendReady(String exchange, String routingKey, String body);

    /**
     * 设置消息发送成功,需要把消息删除
     *
     * @param id 消息ID
     */
    void messageSendSuccess(String id);

    /**
     * 设置消息返回,将消息持久化
     *
     * @param id         id
     * @param exchange   exchange
     * @param routingKey routingKey
     * @param body       body
     * @return TransMessagePO
     */
    TransMessagePO messageSendReturn(
            String id, String exchange, String routingKey, String body);

    /**
     * 查询应发未发消息(之前发送出错的消息,还需要重发,未到告警)
     *
     * @return List<TransMessagePO>
     */
    List<TransMessagePO> listReadyMessages();

    /**
     * 记录消息发送次数
     *
     * @param id id
     */
    void messageResend(String id);

    /**
     * 消息重发多次,放弃
     *
     * @param id id
     */
    void messageDead(String id);

    /**
     * 保存监听到的死信消息
     * @param id
     * @param exchange
     * @param routingKey
     * @param queue
     * @param body
     */
    void messageDead(String id, String exchange,
                     String routingKey, String queue,
                     String body);

    /**
     * 消息消费前保存
     *
     * @param id
     * @param exchange
     * @param routingKey
     * @param queue
     * @param body
     * @return
     */
    TransMessagePO messageReceiveReady(
            String id,
            String exchange,
            String routingKey,
            String queue,
            String body);

    /**
     * 消息消费成功
     *
     * @param id
     */
    void messageReceiveSuccess(String id);

}

TransMessageServiceImpl.java

package cn.kt.food.orderservicemanager.moodymq.service;

import cn.kt.food.orderservicemanager.moodymq.dao.TransMessageDao;
import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.Date;
import java.util.List;
import java.util.UUID;

@Service
public class TransMessageServiceImpl implements TransMessageService {

    @Autowired
    TransMessageDao transMessageDao;

    @Value("${moodymq.service}")
    String serviceName;

    @Override
    public TransMessagePO messageSendReady(String exchange, String routingKey, String body) {
        final String messageId = UUID.randomUUID().toString();
        TransMessagePO transMessagePO = new TransMessagePO();
        transMessagePO.setId(messageId);
        transMessagePO.setService(serviceName);
        transMessagePO.setExchange(exchange);
        transMessagePO.setRoutingKey(routingKey);
        transMessagePO.setPayload(body);
        transMessagePO.setDate(new Date());
        transMessagePO.setSequence(0);
        transMessagePO.setType(TransMessageType.SEND);
        transMessageDao.insert(transMessagePO);
        return transMessagePO;
    }

    @Override
    public void messageSendSuccess(String id) {
        transMessageDao.delete(id, serviceName);
    }

    @Override
    public TransMessagePO messageSendReturn(String id, String exchange, String routingKey, String body) {
       TransMessagePO selectByIdAndService = transMessageDao.selectByIdAndService(id, serviceName);
        if (selectByIdAndService == null) {
            TransMessagePO transMessagePO = new TransMessagePO();
            transMessagePO.setId(id);
            transMessagePO.setService(serviceName);
            transMessagePO.setExchange(exchange);
            transMessagePO.setRoutingKey(routingKey);
            transMessagePO.setPayload(body);
            transMessagePO.setDate(new Date());
            transMessagePO.setSequence(0);
            transMessagePO.setType(TransMessageType.SEND);
            transMessageDao.insert(transMessagePO);

            return transMessagePO;
        } else {
            return selectByIdAndService;
        }
//        return messageSendReady(exchange, routingKey, body);
    }

    @Override
    public List<TransMessagePO> listReadyMessages() {
        return transMessageDao.selectByTypeAndService(
                TransMessageType.SEND.toString(), serviceName
        );
    }

    @Override
    public void messageResend(String id) {
        TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
        transMessagePO.setSequence(transMessagePO.getSequence() + 1);
        transMessageDao.update(transMessagePO);
    }

    @Override
    public void messageDead(String id) {
        TransMessagePO transMessagePO = transMessageDao.selectByIdAndService(id, serviceName);
        transMessagePO.setType(TransMessageType.DEAD);
        transMessageDao.update(transMessagePO);
    }

    @Override
    public void messageDead(String id, String exchange, String routingKey, String queue, String body) {
        TransMessagePO transMessagePO = new TransMessagePO();
        transMessagePO.setId(id);
        transMessagePO.setService(serviceName);
        transMessagePO.setExchange(exchange);
        transMessagePO.setRoutingKey(routingKey);
        transMessagePO.setQueue(queue);
        transMessagePO.setPayload(body);
        transMessagePO.setDate(new Date());
        transMessagePO.setSequence(0);
        transMessagePO.setType(TransMessageType.DEAD);
        transMessageDao.insert(transMessagePO);
    }

    @Override
    public TransMessagePO messageReceiveReady(
            String id, String exchange,
            String routingKey, String queue, String body) {

        TransMessagePO transMessagePO =
                transMessageDao.selectByIdAndService(id, serviceName);
        if (null == transMessagePO) {
            // 说明是第一次消费
            transMessagePO = new TransMessagePO();
            transMessagePO.setId(id);
            transMessagePO.setService(serviceName);
            transMessagePO.setExchange(exchange);
            transMessagePO.setRoutingKey(routingKey);
            transMessagePO.setQueue(queue);
            transMessagePO.setPayload(body);
            transMessagePO.setDate(new Date());
            transMessagePO.setSequence(0);
            transMessagePO.setType(TransMessageType.RECEIVE);
            transMessageDao.insert(transMessagePO);
        } else {
            // 否则消费次数 + 1
            transMessagePO.setSequence(transMessagePO.getSequence() + 1);
            transMessageDao.update(transMessagePO);
        }
        return transMessagePO;
    }

    @Override
    public void messageReceiveSuccess(String id) {
        // 消费成功后删除消息
        transMessageDao.delete(id, serviceName);
    }
}

6. 新建config配置RabbitAdmin和RabbitTemplate实现消息的监听和确认逻辑

消息监听使用手动ack
消息确认机制消息投递至交换机失败进行消息重发
MoodyRabbitConfig.java

package cn.kt.food.orderservicemanager.moodymq.config;

import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class MoodyRabbitConfig {

    @Autowired
    TransMessageService transMessageService;

    @Value("${moodymq.host}")
    String host;
    @Value("${moodymq.port}")
    int port;
    @Value("${moodymq.username}")
    String username;
    @Value("${moodymq.password}")
    String password;
    @Value("${moodymq.vhost}")
    String vhost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        // CORRELATED:发送消息的成功还是失败需要有id的参数,因为确认消息是异步的,需要确认哪条消息被确认,
        // 体现在发送消息前持久化时设置id:message.getMessageProperties().setMessageId(transMessagePO.getId());
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        // 消息无法路由的时候需要设置消息返回
        connectionFactory.setPublisherReturns(true);
        connectionFactory.createConnection();
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin admin = new RabbitAdmin(connectionFactory);
        admin.setAutoStartup(true);
        return admin;
    }

    /* 配置消费端消息监听 */
    @Bean
    public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        // 手动ack
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    /* 消息是否路由的消息确认机制 */
    @Bean
    public RabbitTemplate customRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        // 消息确认回调
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{}, ack:{}, cause:{}",
                    correlationData, ack, cause);
            // 如果消息确认接收
            if (ack && null != correlationData) {
                String messageId = correlationData.getId();
                log.info("消息已经正确投递到交换机, id:{}", messageId);
                transMessageService.messageSendSuccess(messageId);
            } else {
                // 如果消息确认接收失败,则消息保留,等待下次重发
                log.error("消息投递至交换机失败,correlationData:{}", correlationData);
            }
        });
        // 当消息未进入队列时回调
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            log.error("消息无法路由!message:{}, replyCode:{} replyText:{} exchange:{} routingKey:{}",
                    returnedMessage.getMessage(), returnedMessage.getReplyCode(),
                    returnedMessage.getReplyText(), returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey());
            transMessageService.messageSendReturn(
                    returnedMessage.getMessage().getMessageProperties().getMessageId(),
                    returnedMessage.getExchange(),
                    returnedMessage.getRoutingKey(),
                    new String(returnedMessage.getMessage().getBody())
            );
        });
        return rabbitTemplate;
    }
}

7. 新建抽象类实现ChannelAwareMessageListener完成消息监听

监听消息的接收和业务执行是否异常,如果消息处理异常,则消息重回队列
AbstractMessageListener.java

package cn.kt.food.orderservicemanager.moodymq.listener;

import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import java.io.IOException;

@Slf4j
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {

    @Autowired
    TransMessageService transMessageService;
    @Value("${moodymq.resendTimes}")
    Integer resendTimes;

    public abstract void receviceMessage(Message message) throws JsonProcessingException;

    @Override
    public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
        MessageProperties messageProperties = message.getMessageProperties();
        // deliveryTag:跟消息接收确认有关的数字
        long deliveryTag = messageProperties.getDeliveryTag();
        // 持久化接收到的消息
        log.info("收到的消息{}", new String(message.getBody()));
        TransMessagePO transMessagePO =
                transMessageService.messageReceiveReady(
                        messageProperties.getMessageId(),
                        messageProperties.getReceivedExchange(),
                        messageProperties.getReceivedRoutingKey(),
                        messageProperties.getConsumerQueue(),
                        new String(message.getBody())
                );
        log.info("收到消息{}, 消费次数{}",
                messageProperties.getMessageId(), transMessagePO.getSequence());

        try {
            // 该方法让业务去执行,这里抓异常
            receviceMessage(message);
            // 消息处理完成
            channel.basicAck(deliveryTag, false);
            transMessageService.messageReceiveSuccess(messageProperties.getMessageId());
        } catch (Exception e) {
            // 消息处理异常
            log.error(e.getMessage(), e);
            // 判断该消息的消费次数
            if (transMessagePO.getSequence() >= resendTimes) {
                // 消费次数超限,拒收消息
                channel.basicReject(deliveryTag, false);
            } else {
                // 消息重回队列
                Thread.sleep((long) (Math.pow(2, transMessagePO.getSequence())) * 1000);
                channel.basicNack(deliveryTag, false, true);
            }
        }
    }
}

8. 配置死信消息告警

声明死信交换机、队列和绑定
DlxConfig.java

package cn.kt.food.orderservicemanager.moodymq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxConfig {
    /*
    * 声明死信交换机、队列和绑定
    */
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange("exchange.dlx");
    }

    @Bean
    public Queue dlxQueue() {
        return new Queue("queue.dlx",
                true,
                false,
                false);
    }

    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("#");
    }
}

监听死信消息
DlxListener.java

package cn.kt.food.orderservicemanager.moodymq.listener;

import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@Component
@Slf4j
//配置是不是需要监听死信:@ConditionalOnProperty实现是通过havingValue与配置文件中的值对比,返回为true则配置类生效,反之失效.
@ConditionalOnProperty("moodymq.dlxEnabled")
public class DlxListener implements ChannelAwareMessageListener {
    @Autowired
    TransMessageService transMessageService;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String messageBody = new String(message.getBody());
        log.error("dead letter! message:{}", message);
        //发邮件、打电话、发短信
        //XXXXX()
        MessageProperties messageProperties = message.getMessageProperties();
        transMessageService.messageDead(
                messageProperties.getMessageId(),
                messageProperties.getReceivedExchange(),
                messageProperties.getReceivedRoutingKey(),
                messageProperties.getConsumerQueue(),
                messageBody
        );
        // 单条确认
        channel.basicAck(messageProperties.getDeliveryTag(), false);
    }
}

9. 配置定时任务

每隔5秒巡检异常消息
ResendTask.java

package cn.kt.food.orderservicemanager.moodymq.task;

import cn.kt.food.orderservicemanager.moodymq.enummeration.TransMessageType;
import cn.kt.food.orderservicemanager.moodymq.po.TransMessagePO;
import cn.kt.food.orderservicemanager.moodymq.service.TransMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

@EnableScheduling
@Configuration
@Component
@Slf4j
public class ResendTask {

    @Autowired
    TransMessageService transMessageService;
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Value("${moodymq.resendTimes}")
    Integer resendTimes;

    // 在配置类中取出来
    @Scheduled(fixedDelayString = "${moodymq.resendFreq}")
    public void resendMessage(){
        log.info("resendMessage() invoked.");
        List<TransMessagePO> messagePOS =
                transMessageService.listReadyMessages();
        log.info("resendMessage(): messagepos:{}", messagePOS);

        for (TransMessagePO po: messagePOS) {
            log.info("resendMessage(): po:{}", po);
            // 过滤dead消息
            if(po.getSequence() > resendTimes){
                log.error("resend too many times!");
                transMessageService.messageDead(po.getId());
                continue;
            }
            // 封装和发送消息
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setContentType("application/json");
            Message message = new Message(po.getPayload().getBytes(), messageProperties);
            message.getMessageProperties().setMessageId(po.getId());
            rabbitTemplate.convertAndSend(
                    po.getExchange(),
                    po.getRoutingKey(),
                    message,
                    new CorrelationData(po.getId()));

            log.info("message sent, ID:{}", po.getId());
            // 消息重发,发送次数+1
            transMessageService.messageResend(po.getId());
        }
    }
}

10. 改造moddymq包外的业务代码

  1. 继承moddymq包中抽象出来的监听方法
  2. 注解绑定交换机队列配置改用RabbitConfig配置类中使用@Bean配置
  3. 监听消息的handMessage改用抽象类的方法receviceMessage进行重写
  4. 在moddymq包外的业务代码中使用包内封装的发送方法
    具体实现详情看下面源码

源码下载

https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_3

小结

  1. 消息发送失败重试,消息消费失败重试,死信消息告警只是有效的保证rabbitMQ消息的事务一致性,有效的解决了消息失败的结果。
  2. 在实际项目中可以把开发的分布式事务框架包moddymq新建另外一个项目,并打成jar包,统一使用规范供多微服务模块使用
  3. 本moddymq中并无注明给死信队列queue.dlx发送消息的场景,实际开发中可以定时将状态为DEAD的消息发送至死信队列进行死信告警。告警方法方法已给出,但具体告警逻辑可以根据实际场景需要进行完善。
  4. 在源代码中,沿用了RabbitMQ快速上手中的订单微服务的案例,改造使用了该分布式的事务框架。
0 条回应
在线人数:1人 来访统计
隐藏