RabbitMQ

RabbitMQ学习笔记(二)——RabbitMQ快速上手

Nick · 4月21日 · 2022年 · 本文26685字 · 阅读67分钟196

RabbitMQ快速上手的学习案例使用一个高可用外卖系统的demo。
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

高可用外卖系统

高可用外卖系统需求分析

  1. 一个外卖后端系统,用户可以在线下单外卖
  2. 用户下单后,可以实时查询订单进度
  3. 系统可以承受短时间的大量并发请求

架构设计

使用微服务系统,组件之间充分解耦
使用消息中间件,解耦业务逻辑
使用数据库,持久化业务数据
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

什么是微服务架构

将应用程序构建为松耦合、可独立部署的一组服务
服务:一个单一的、可独立部署的软件组件,实现了一些有用的功能
松耦合:封装服务的实现细节,通过API调用

如何拆分微服务

根据系统操作进行微服务拆分
根据业务能力进行微服务拆分(推荐使用)
根据子域进行微服务拆分

根据业务能力进行微服务拆分

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

合理的交换机和队列设置

  • 交换机数量不能过多,一般来说同一个业务,或者同一类业务使用同一个交换机
  • 合理设置队列数量,一般来说一个微服务监听一个队列,或者一个微服务的一个业务监听一个队列
  • 合理配置交换机类型,使用Topic模式时仔细设置绑定键
  • 尽量使用自动化 配置将创建交换机/队列的操作固化在应用代码中,免去复杂的运维操作,高效且不易出错

业务流程时序图

RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

接口需求

新建订单接口
查询订单接口
接口采用REST风格

微服务的数据库设计原则

每个微服务使用自己的数据库
不要使用共享数据库的方式进行通信
不要使用外键,对于数据量非常少的表慎用索引
RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客
food.sql

-- ----------------------------
-- Table structure for deliveryman
-- ----------------------------
DROP TABLE IF EXISTS `deliveryman`;
CREATE TABLE `deliveryman`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '骑手id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of deliveryman
-- ----------------------------
INSERT INTO `deliveryman` VALUES (1, 'wangxiaoer', 'AVALIABLE', '2020-06-10 20:30:17');

-- ----------------------------
-- Table structure for order_detail
-- ----------------------------
DROP TABLE IF EXISTS `order_detail`;
CREATE TABLE `order_detail`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '订单id',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `address` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '订单地址',
  `account_id` int(11) NULL DEFAULT NULL COMMENT '用户id',
  `product_id` int(11) NULL DEFAULT NULL COMMENT '产品id',
  `deliveryman_id` int(11) NULL DEFAULT NULL COMMENT '骑手id',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `reward_id` int(11) NULL DEFAULT NULL COMMENT '积分奖励id',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '价格',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 27 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of order_detail
-- ----------------------------
INSERT INTO `order_detail` VALUES (9, 'SETTLEMENT_CONFIRMED', '深圳', 12145, 2, 1, 2, NULL, 23.25, '2022-04-04 17:57:02');
INSERT INTO `order_detail` VALUES (10, 'ORDER_CREATED', '深圳', 12145, 2, 1, 3, 1, 23.25, '2022-04-05 23:57:19');

-- ----------------------------
-- Table structure for product
-- ----------------------------
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '产品id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `price` decimal(9, 2) NULL DEFAULT NULL COMMENT '单价',
  `restaurant_id` int(11) NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of product
-- ----------------------------
INSERT INTO `product` VALUES (2, 'eqwe', 23.25, 1, 'AVALIABLE', '2020-05-06 19:19:04');

-- ----------------------------
-- Table structure for restaurant
-- ----------------------------
DROP TABLE IF EXISTS `restaurant`;
CREATE TABLE `restaurant`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '餐厅id',
  `name` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '名称',
  `address` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '地址',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `settlement_id` int(11) NULL DEFAULT NULL COMMENT '结算id',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of restaurant
-- ----------------------------
INSERT INTO `restaurant` VALUES (1, 'qeqwe', '2weqe', 'OPEN', 1, '2020-05-06 19:19:39');

-- ----------------------------
-- Table structure for reward
-- ----------------------------
DROP TABLE IF EXISTS `reward`;
CREATE TABLE `reward`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '奖励id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '积分量',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 4 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of reward
-- ----------------------------
INSERT INTO `reward` VALUES (1, 10, 23.25, 'SUCCESS', '2022-04-06 00:00:01');

-- ----------------------------
-- Table structure for settlement
-- ----------------------------
DROP TABLE IF EXISTS `settlement`;
CREATE TABLE `settlement`  (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '结算id',
  `order_id` int(11) NULL DEFAULT NULL COMMENT '订单id',
  `transaction_id` int(11) NULL DEFAULT NULL COMMENT '交易id',
  `amount` decimal(9, 2) NULL DEFAULT NULL COMMENT '金额',
  `status` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '状态',
  `date` datetime(0) NULL DEFAULT NULL COMMENT '时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 6 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of settlement
-- ----------------------------
INSERT INTO `settlement` VALUES (2, 9, 571087981, 23.25, 'SUCCESS', '2022-04-04 17:59:08');

原生RabbitMQ快速上手步骤

订单微服务搭建步骤:

  1. 目录结构
    RabbitMQ学习笔记(二)——RabbitMQ快速上手-左眼会陪右眼哭の博客

  2. 导入pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.kt</groupId>
    <artifactId>food</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>food</name>
    <description>food System</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.6</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
    </project>
  3. 编写配置文件application.properties

    #订单微服务配置类
    server.port=8080
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/food?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    #Rabbitmq相关配置
    rabbitmq.host=192.168.137.133
    rabbitmq.port=5672
    rabbitmq.username=guest
    rabbitmq.password=guest
    #本服务使用的交换机
    rabbitmq.exchange=exchange.food
  4. 编写PO、VO、DTO等数据传输对象

OrderDetailPO.java(存数据库所用类型)

package cn.kt.food.orderservicemanager.po;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
/**
 * @author tao
 * @date 2022-03-22 21:36
 * 概要:存数据库所用类型
 */

@Data
public class OrderDetailPO {
    private Integer id;
    private OrderStatusEnum status;
    private String address;
    private Integer accountId;
    private Integer productId;
    private Integer deliverymanId;
    private Integer settlementId;
    private Integer rewardId;
    private BigDecimal price;
    private Date date;
}

OrderCreateVO.java(前端传进来的数据)

package cn.kt.food.orderservicemanager.vo;
import lombok.Data;

/**
 * @author tao
 * @date 2022-03-22 21:25
 * 概要:  vo:前端传进来的数据
 */
@Data
public class OrderCreateVO {
    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 地址
     */
    private String address;

    /**
     * 产品ID
     */
    private Integer productId;
}

OrderMessageDTO.java(消息体,用于传输数据)

package cn.kt.food.orderservicemanager.dto;
import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
import lombok.Data;
import java.math.BigDecimal;

/**
 * @author tao
 * @date 2022-03-22 21:27
 * 概要:dto:消息体,用于传输数据
 */
@Data
public class OrderMessageDTO {

    /**
     * 订单ID
     */
    private Integer orderId;

    /**
     * 订单状态
     */
    private OrderStatusEnum orderStatus;

    /**
     * 价格
     */
    private BigDecimal price;

    /**
     * 骑手ID
     */
    private Integer deliverymanId;

    /**
     * 产品ID
     */
    private Integer productId;

    /**
     * 用户ID
     */
    private Integer accountId;

    /**
     * 结算ID
     */
    private Integer settlementId;

    /**
     * 积分结算ID
     */
    private Integer rewardId;

    /**
     * 积分奖励数量
     */
    private BigDecimal rewardAmount;

    /**
     * 确认
     */
    private Boolean confirmed;
}
  1. 编写订单状态枚举类OrderStatusEnum.java

    package cn.kt.food.orderservicemanager.enums;
    /**
    * @author tao
    * @date 2022-03-22 21:29
    * 概要:  订单状态枚举
    */
    public enum OrderStatusEnum {
    
    /**
     * 创建中
     */
    ORDER_CREATING,
    
    /**
     * 餐厅已确认
     */
    
    RESTAURANT_CONFIRMED,
    
    /**
     * 骑手确认
     */
    DELIVERYMAN_CONFIRMED,
    
    /**
     * 已结算
     */
    SETTLEMENT_CONFIRMED,
    
    /**
     * 订单已创建
     */
    ORDER_CREATED,
    
    /**
     * 订单创建失败
     */
    FAILED;
    }
  2. 编写数据库dao层OrderDetailDao.java

    package cn.kt.food.orderservicemanager.dao;
    import cn.kt.food.orderservicemanager.po.OrderDetailPO;
    import org.apache.ibatis.annotations.Insert;
    import org.apache.ibatis.annotations.Mapper;
    import org.apache.ibatis.annotations.Options;
    import org.apache.ibatis.annotations.Select;
    import org.apache.ibatis.annotations.Update;
    import org.springframework.stereotype.Repository;
    /**
    * @author tao
    * @date 2022-03-22 21:39
    * 概要:
    */
    @Mapper
    @Repository
    public interface OrderDetailDao {
    
    @Insert("INSERT INTO order_detail (status, address, account_id, product_id, deliveryman_id, settlement_id, " +
            "reward_id, price, date) VALUES(#{status}, #{address},#{accountId},#{productId},#{deliverymanId}," +
            "#{settlementId}, #{rewardId},#{price}, #{date})")
    @Options(useGeneratedKeys = true, keyProperty = "id")
    void insert(OrderDetailPO orderDetailPO);
    
    @Update("update order_detail set status =#{status}, address =#{address}, account_id =#{accountId}, " +
            "product_id =#{productId}, deliveryman_id =#{deliverymanId}, settlement_id =#{settlementId}, " +
            "reward_id =#{rewardId}, price =#{price}, date =#{date} where id=#{id}")
    void update(OrderDetailPO orderDetailPO);
    
    @Select("SELECT id,status,address,account_id accountId, product_id productId,deliveryman_id deliverymanId," +
            "settlement_id settlementId,reward_id rewardId,price, date FROM order_detail WHERE id = #{id}")
    OrderDetailPO selectOrder(Integer id);
    }
  3. 编写处理用户订单的业务请求service
    OrderService.java

    package cn.kt.food.orderservicemanager.service;
    import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
    import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
    import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
    import cn.kt.food.orderservicemanager.po.OrderDetailPO;
    import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.rabbitmq.client.*;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import java.io.IOException;
    import java.util.Date;
    import java.util.concurrent.TimeoutException;
    /**
    * @author tao
    * @date 2022-03-24 21:13
    * 概要:  处理用户关于订单的业务请求
    */
    @Slf4j
    @Service
    public class OrderService {
    @Autowired
    private OrderDetailDao orderDetailDao;
    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @Value("${rabbitmq.host}")
    public String host;
    @Value("${rabbitmq.exchange}")
    public String exchangeName;
    
    private ObjectMapper objectMapper = new ObjectMapper();
    
    // 创建订单
    public void createOrder(OrderCreateVO orderCreateVO) throws IOException, TimeoutException {
        log.info("createOrder:orderCreateVO:{}", orderCreateVO);
        OrderDetailPO orderPO = new OrderDetailPO();
        orderPO.setAddress(orderCreateVO.getAddress());
        orderPO.setAccountId(orderCreateVO.getAccountId());
        orderPO.setProductId(orderCreateVO.getProductId());
        orderPO.setStatus(OrderStatusEnum.ORDER_CREATING);
        orderPO.setDate(new Date());
        // 会返回数据库自动生成的数据
        orderDetailDao.insert(orderPO);
    
        OrderMessageDTO orderMessageDTO = new OrderMessageDTO();
        orderMessageDTO.setOrderId(orderPO.getId());
        orderMessageDTO.setProductId(orderPO.getProductId());
        orderMessageDTO.setAccountId(orderCreateVO.getAccountId());
    
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
    
        // 创建订单之后给restaurant发消息
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 配置channel,开启确认模式
            channel.confirmSelect();
    
            //单条同步确认机制
            if (channel.waitForConfirms()) {
                log.info("RabbitMQ confirm success");
            } else {
    
                log.info("RabbitMQ confirm failed");
            }
    
            // 异步同步确认机制
            ConfirmListener confirmListener = new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    log.info("Ack deliveryTag:{},mutiple:{}", l, b);
                    // 消息发送成功
                }
    
                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    log.info("Nack deliveryTag:{},mutiple:{}", l, b);
                    // 消息发送失败
                }
            };
            channel.addConfirmListener(confirmListener);
    
            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
    
            //(exchange,routingKey,消息特殊参数,消息体本身(字节))
            // channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
    
            // 设置单条消息的过期时间(时间到期后消息会被消费)
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());
            /*for (int i = 0; i < 50; i++) {
                channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
                log.info("message sent");
            }*/
    
            // 发送多条消息
            /*for (int i = 0; i < 10; i++) {
                channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
                log.info("message sent");
            }
            Thread.sleep(10000);*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    }
  4. 编写消息处理相关业务逻辑service
    OrderMessageService.java

    package cn.kt.food.orderservicemanager.service;
    import cn.kt.food.orderservicemanager.dao.OrderDetailDao;
    import cn.kt.food.orderservicemanager.dto.OrderMessageDTO;
    import cn.kt.food.orderservicemanager.enums.OrderStatusEnum;
    import cn.kt.food.orderservicemanager.po.OrderDetailPO;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
    * @author tao
    * @date 2022-03-24 21:15
    * 概要:消息处理相关业务逻辑
    */
    @Slf4j
    @Service
    public class OrderMessageService {
    @Value("${rabbitmq.host}")
    public String host;
    @Value("${rabbitmq.exchange}")
    public String exchangeName;
    
    @Autowired
    private OrderDetailDao orderDetailDao;
    ObjectMapper objectMapper = new ObjectMapper();
    /**
     * 声明消息队列、交换机、绑定、消息的处理
     * (异步线程调用这个方法,且异步线程不能退出,注册完消费者之后sleep,需要设置线程池)
     */
    @Async
    public void handleMessage() throws IOException, TimeoutException, InterruptedException {
        log.info("start linstening message");
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
    
            /*---------------------restaurant微服务(声明)---------------------*/
            // order交换机
            channel.exchangeDeclare(
                    "exchange.order.restaurant",    //交换机名称
                    BuiltinExchangeType.DIRECT,     //交换机类型
                    true,   //是否持久化
                    false,  //是否交换机长时间不使用删除
                    null);  //是否交换机长时间不使用删除
    
            // 订单队列
            channel.queueDeclare(
                    "queue.order",  //队列名称
                    true,   //是否持久化
                    false,  // 队列是否独占(独占只允许一个应用连接)
                    false,  //是否交换机长时间不使用删除
                    null);  //是否交换机长时间不使用删除
    
            // 队列绑定交换机
            channel.queueBind(
                    "queue.order",  //队列名称
                    "exchange.order.restaurant",    //交换机名称
                    "key.order");   //路由键,用来指示消息的路由转发,相当于快递的地址
    
            /*---------------------deliveryman微服务---------------------*/
            // 骑手交换机
            channel.exchangeDeclare(
                    "exchange.order.deliveryman",
                    BuiltinExchangeType.DIRECT,
                    true,
                    false,
                    null);
    
            channel.queueBind(
                    "queue.order",
                    "exchange.order.deliveryman",
                    "key.order");
    
            /*---------------------settlement微服务---------------------*/
            // 结算交换机
            channel.exchangeDeclare(
                    "exchange.order.settlement",
                    BuiltinExchangeType.FANOUT,
                    true,
                    false,
                    null);
    
            channel.queueBind(
                    "queue.order",
                    "exchange.settlement.order",
                    "key.order");
    
            /*---------------------reward微服务---------------------*/
            // 积分交换机
            channel.exchangeDeclare(
                    "exchange.order.reward",
                    BuiltinExchangeType.TOPIC,
                    true,
                    false,
                    null);
    
            channel.queueBind(
                    "queue.order",
                    "exchange.order.reward",
                    "key.order");// 降级使用,没有使用到TOPIC的特性
    
            /**
             * 监听订单状态
             * (队列,是不是ACK,回调函数,消费者标签)
             */
            channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
            });
            while (true) {
                Thread.sleep(100000);
            }
        }
    }
    
    DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        try {
            // 将消息体反序列化成DTO
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);
            // 读取数据库中的PO
            OrderDetailPO orderPO = orderDetailDao.selectOrder(orderMessageDTO.getOrderId());
            switch (orderPO.getStatus()) {
                case ORDER_CREATING:
                    // 修改订单状态
                    if (orderMessageDTO.getConfirmed() && null != orderMessageDTO.getPrice()) {
                        orderPO.setStatus(OrderStatusEnum.RESTAURANT_CONFIRMED);
                        orderPO.setPrice(orderMessageDTO.getPrice());
                        orderDetailDao.update(orderPO);
                        // 订单状态更新后给骑手发消息
                        try (Connection connection = connectionFactory.newConnection();
                             Channel channel = connection.createChannel()) {
                            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                            channel.basicPublish("exchange.order.deliveryman",
                                    "key.deliveryman",
                                    null,
                                    messageToSend.getBytes());
                        }
                    } else {
                        orderPO.setStatus(OrderStatusEnum.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
                case RESTAURANT_CONFIRMED:
                    if (null != orderMessageDTO.getDeliverymanId()) {
                        orderPO.setStatus(OrderStatusEnum.DELIVERYMAN_CONFIRMED);
                        orderPO.setDeliverymanId(orderMessageDTO.getDeliverymanId());
                        orderDetailDao.update(orderPO);
                        // 发消息
                        try (Connection connection = connectionFactory.newConnection();
                             Channel channel = connection.createChannel()) {
                            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                            channel.basicPublish("exchange.order.settlement",
                                    "key.settlement",
                                    null,
                                    messageToSend.getBytes());
                        }
                    } else {
                        orderPO.setStatus(OrderStatusEnum.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
                case DELIVERYMAN_CONFIRMED:
                    if (null != orderMessageDTO.getSettlementId()) {
                        orderPO.setStatus(OrderStatusEnum.SETTLEMENT_CONFIRMED);
                        orderPO.setSettlementId(orderMessageDTO.getSettlementId());
                        orderDetailDao.update(orderPO);
                        try (Connection connection = connectionFactory.newConnection();
                             Channel channel = connection.createChannel()) {
                            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                            channel.basicPublish("exchange.order.reward", "key.reward", null, messageToSend.getBytes());
                        }
                    } else {
                        orderPO.setStatus(OrderStatusEnum.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
                case SETTLEMENT_CONFIRMED:  // 订单创建完成
                    if (null != orderMessageDTO.getRewardId()) {
                        orderPO.setStatus(OrderStatusEnum.ORDER_CREATED);
                        orderPO.setRewardId(orderMessageDTO.getRewardId());
                        orderDetailDao.update(orderPO);
                    } else {
                        orderPO.setStatus(OrderStatusEnum.FAILED);
                        orderDetailDao.update(orderPO);
                    }
                    break;
            }
    
        } catch (JsonProcessingException | TimeoutException e) {
            e.printStackTrace();
        }
    };
    }
  5. 编写接口controller
    OrderController.java

    package cn.kt.food.orderservicemanager.controller;
    import cn.kt.food.orderservicemanager.service.OrderService;
    import cn.kt.food.orderservicemanager.vo.OrderCreateVO;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    /**
    * @author tao
    * @date 2022-03-24 22:12
    * 概要:
    */
    @Slf4j
    @RestController
    @RequestMapping("api/v1")
    public class OrderController {
    @Autowired
    OrderService orderService;
    
    @PostMapping("/orders")
    public void createOrder(@RequestBody OrderCreateVO orderCreateDTO) throws IOException, TimeoutException {
        log.info("createOrder:orderCreateDTO:{}", orderCreateDTO);
        orderService.createOrder(orderCreateDTO);
    }
    }
  6. 线程池配置类和自动监听配置
    线程池配置类:AsyncTaskConfig.java

    package cn.kt.food.orderservicemanager.config;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import java.util.concurrent.Executor;
    /**
    * @author tao
    * @date 2022-03-24 22:44
    * 概要:  线程池配置类
    */
    @Configuration
    @EnableAsync
    public class AsyncTaskConfig implements AsyncConfigurer {
    
    // ThredPoolTaskExcutor的处理流程
    // 当池子大小小于corePoolSize,就新建线程,并处理请求
    // 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
    // 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
    // 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
    
    @Override
    @Bean
    public Executor getAsyncExecutor() {
        // 起一个线程池
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数
        threadPool.setCorePoolSize(10);
        //设置最大线程数
        threadPool.setMaxPoolSize(100);
        //线程池所使用的缓冲队列
        threadPool.setQueueCapacity(10);
        //等待任务在关机时完成--表明等待所有线程执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
        threadPool.setAwaitTerminationSeconds(60);
        //  线程名称前缀
        threadPool.setThreadNamePrefix("Rabbit-Async-");
        // 初始化线程
        threadPool.initialize();
        return threadPool;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
    }

    RabbitMQ需要自动执行并且实时监听,因此需要配置自动执行OrderMessageService中handleMessage方法
    RabbitConfig.java

package cn.kt.food.orderservicemanager.config;
import cn.kt.food.orderservicemanager.service.OrderMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author tao
 * @date 2022-03-24 22:58
 * 概要:  自动执行OrderMessageService中handleMessage方法(配置了RabbitMQ的交换机等)
 */
@Slf4j
@Configuration
public class RabbitConfig {
    @Autowired
    OrderMessageService orderMessageService;
    //配置类中的@Autowired方法会被自动调用
    @Autowired
    public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
        orderMessageService.handleMessage();
    }
}

订单微服务和RabbitMQ的创建大致如上,因此也还有:商家微服务、骑手微服务、结算微服务、积分微服务。
其功能是在订单的每个阶段处理相应的业务逻辑,其中在每个微服务的消息通讯时使用RabbitMQ进行消息的路由和转发,套路和订单微服务差不多一致。

注:其余微服务和总代码放在文章末尾

RabbitMQ使用总结

  1. 新建ConnectionFactory

    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setHost("localhost");
  2. Channel处理相关配置和使用basicPublish发送消息
    注意:channel.basicPublish(exchange,routingKey,消息特殊参数,消息体本身(字节))
    RabbitMQ发送的消息体本身是字节

    try (Connection connection = connectionFactory.newConnection();
           Channel channel = connection.createChannel()) {
           // 业务逻辑
    
           // 发送消息处理
           ObjectMapper objectMapper = new ObjectMapper();
           String messageToSend = objectMapper.writeValueAsString("需要发送的消息");
           //(exchange,routingKey,消息特殊参数,消息体本身(字节))
           channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
    
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
  3. 配置RabbitMQ的Exchange和queue

    // 声明交换机
    channel.exchangeDeclare(
     "exchange.name",
     BuiltinExchangeType.DIRECT,
     true,
     false,
     null);
    // 声明消息队列
    channel.queueDeclare(
     "queue.name",
     true,
     false,
     false,
     null);
    // 队列绑定交换机
    channel.queueBind(
     "queue.name",
     "exchange.name",
     "key.name");
  4. 使用basicConsume消费消息

    @Async
    public void handleMessage() {
      /**
        * 监听订单状态
        * (队列,是不是ACK,回调函数,消费者标签)
       */
     channel.basicConsume("queue.name", true, deliverCallback, consumerTag -> {
     });
    }
  5. 定义回调函数
    收到消息后进入的回调函数

    DeliverCallback deliverCallback = (consumerTag, message) -> {
    //业务逻辑
    };
  6. 配置线程池

    @Configuration
    @EnableAsync
    public class AsyncTaskConfig implements AsyncConfigurer {
    // ThredPoolTaskExcutor的处理流程
    // 当池子大小小于corePoolSize,就新建线程,并处理请求
    // 当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去workQueue中取任务并处理
    // 当workQueue放不下任务时,就新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize,就用RejectedExecutionHandler来做拒绝处理
    // 当池子的线程数大于corePoolSize时,多余的线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁
    
    @Override
    @Bean
    public Executor getAsyncExecutor() {
        // 起一个线程池
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
        //设置核心线程数
        threadPool.setCorePoolSize(10);
        //设置最大线程数
        threadPool.setMaxPoolSize(100);
        //线程池所使用的缓冲队列
        threadPool.setQueueCapacity(10);
        //等待任务在关机时完成--表明等待所有线程执行完
        threadPool.setWaitForTasksToCompleteOnShutdown(true);
        // 等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
        threadPool.setAwaitTerminationSeconds(60);
        //  线程名称前缀
        threadPool.setThreadNamePrefix("Rabbit-Async-");
        // 初始化线程
        threadPool.initialize();
        return threadPool;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
    }
  7. 使用线程池启动basicConsume

    //配置类中的@Autowired方法会被自动调用
    @Autowired
    public void startListenMessage() throws IOException, TimeoutException, InterruptedException {
        orderMessageService.handleMessage();
    }

使用原生RabbitMQ项目中的不足之处

消息真的发出去了吗?

消息发送后,发送端不知道RabbitMQ是否真的收到了消息
若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
需要使用RabbitMQ发送端确认机制,确认消息发送

消息真被路由了吗?

消息发送后,发送端不知道消息是否被正确路由,若路由异常,消息会被丢弃
消息丢弃后,订单处理流程停止,业务异常
需要使用RabbitMQ消息返回机制,确认消息被正确路由

消费端处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃
需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK)
消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
需要使用RabbitMQ消费端确认机制,确认消息被正确处理

队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力
需要使用RabbitMQ消息过期时间,防止消息大量积压

如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报
需要使用RabbitMQ死信队列,收集过期消息,以供分析

不足之处总结

目前项目急需引入的RabbitMQ新特性:
发送端确认机制
消费端确认机制
消息返回机制
消息过期机制
消费端限流机制
死信队列

解决这些不足之处需要用到RabbitMQ的高级特性。

实际开发中经验及小结

  1. 使用线程池:对于频繁创建与销毁的线程,必须使用线程池,否则极易线程溢出,造成“线程爆炸”
  2. POJO类单一职责
    a. 各种POJO数据结构必须单一职责,混用会导致代码混乱
    b. PO/DO: (Persistent Object/Data Object)持久对象
    c. DTO:(Data Transfer Object)数据传输对象
    d. BO:(Business Object)业务对象
    e. vo: (View Object)显示层对象

源代码:

https://gitee.com/KT1205529635/rabbit-mq/tree/master/food_master_1

2 条回应
  1. 2132022-6-28 · 15:09

    血小板还能唱歌??女孩子???!厉害

    • Nick2022-6-28 · 15:10

      男孩子哦

在线人数:1人 来访统计
说谎
林宥嘉
隐藏