超时关单方案选择

在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内,那他们是怎么实现的呢?对此我查到了不少解决方案,其中有:

  1. Redis过期消息监听

  2. Redisson的DelayQueue

  3. RabbitMQ死信队列

  4. RabbitMQ延迟插件

这四个方案中,十分不推荐的是第一个Redis过期消息监听 ,因为redis 不能保证会在过期后立即删除过期键并发送通知,稍微有点数据量就会导致监听延后非常严重,这里可以参考:请勿过度依赖Redis的过期监听 - 掘金 (juejin.cn)

第二不推荐的是RabbitMQ死信队列,它被设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。和Redis过期消息监听一样被用于做延迟任务都是作者意料之外的骚操作。这个方案的缺点是死信队列是先进先出的,第一条消息没有过期,后面的消息即使过期了也不会被处理。但我们的超时关单过期时间明显是固定的,只需要创建一个队列专门处理超时关单的消息即可。

然而Redisson的DelayQueue和RabbitMQ延迟插件也不是万能的,前者需要注意数据的持久化,保证数据不丢失。后者需要注意控制队列中消息数量不能过多(10W+),否则会占用大量CPU资源。

Redisson实现延迟消息

一、引入依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>

		<!-- Redisson的版本自行匹配,此处SpringBoot3.2.4 -->
		<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson-spring-boot-starter</artifactId>
			<version>3.27.2</version>
		</dependency>

二、配置Redis

## SpringBoot2.x及以下是spring.redis,  3.x才是spring.data.redis
spring:
  data:
    redis:
      host: 127.0.0.1
      port: 6379
      password: admin

三、Redisson实现延迟队列

@Service
@RequiredArgsConstructor
@Slf4j
public class RedisMessageService {

    private final RedissonClient redisson;

    private RBlockingQueue<String> blockingQueue;

    private RDelayedQueue<String> delayedQueue;

    /**
     * 依赖加载完毕后进行初始化
     */
    @PostConstruct
    public void init() {
        blockingQueue = redisson.getBlockingQueue("blockingQueue");
        delayedQueue = redisson.getDelayedQueue(blockingQueue);
        // 开启一个线程持续监听BlockingQueue,队列为空则阻塞等待
        new Thread(() -> {
            log.info("start listen delayedQueue...");
            while(true) {
                try {
                    //延迟队列有数据就返回,否则wait
                    log.info("接收时间:" + new Date());
                    log.info("接收消息: " + blockingQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


    /**
     * 向延迟队列发消息
     * @param message message
     */
    public void sendMessage(String message, Long delayTime) {
        delayedQueue.offer(message, delayTime, TimeUnit.SECONDS);
    }

}

四、测试延迟消息

@RestController
@RequiredArgsConstructor
@Slf4j
public class RedisTestController {
    private final RedisMessageService redisMessageService;

    @GetMapping("/redis/send")
    public void send(){
        log.info("发送时间: " + new Date());
        redisMessageService.sendMessage("这就完了?就是这么简单!", 10L);
    }
}

Redisson封装的组件就是这么简单,具体原理本文不过多赘述。

RabbitMQ死信队列实现延迟消息

一、引入依赖

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>2.0.9.graal</version>
		</dependency>

二、配置RabbitMQ

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin

三、配置消息序列化方式,便于收发json数据

@Configuration
@Slf4j
@RequiredArgsConstructor
public class RabbitMQConfiguration {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        //默认是用jdk序列化
        //数据转换为json存入消息队列,方便可视化界面查看消息数据
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

        return rabbitTemplate;
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}

四、创建 交换机、队列,并绑定关系(重要)

扫盲:

  1. 什么是交换机: RabbitMQ 交换机详解_RabbitMQ 入门教程-慕课网 (imooc.com)

  2. 什么是死信队列: 浅谈RabbitMQ——死信队列与延迟队列 - 知乎 (zhihu.com)

我这里创建了三个队列

  1. queue 接收普通消息

  2. delayQueue 接收需要延迟的消息,到期后根据死信配置发送到其他队列

  3. transmitQueue 接收delayQueue 中过期的消息,作为中转站将消息发往真正的目的地(可以直接配置delayQueue 过期发到queue ,为了更灵活所以加一个中转站)

/**
 * 创建交换机
 * 创建队列并绑定到交换机上
 */
@Configuration
public class QueueConfiguration {

    // 创建交换机
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(MQConstant.EXCHANGE, true, false);
    }

    // 创建测试队列
    @Bean
    public Queue queue() {
        return new Queue(MQConstant.HELLO_QUEUE, true, false, false);
    }
    // 绑定测试队列
    @Bean
    public Binding binding() {
        //队列绑定到exchange上,再绑定好路由键
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE);
    }

    //转发队列
    @Bean
    public Queue transmitQueue() {
        return new Queue(MQConstant.TRANSMIT_QUEUE, true, false, false);
    }
    //绑定转发队列
    @Bean
    public Binding  transmitQueueBinding() {
        return BindingBuilder.bind(transmitQueue()).to(defaultExchange()).with(MQConstant.TRANSMIT_QUEUE);
    }

    // 延迟队列, 需要延迟的消息发送这里,时间耗尽后自动路由到转发队列
    @Bean
    public Queue delayQueue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", MQConstant.EXCHANGE);           // 指定死信交换机
        arguments.put("x-dead-letter-routing-key", MQConstant.TRANSMIT_QUEUE);  // 指定死信路由的key
        return new Queue(MQConstant.DELAY_QUEUE, true, false, false, arguments);
    }
    // 绑定延迟队列
    @Bean
    public Binding delayQueueBinding() {
        return BindingBuilder.bind(delayQueue()).to(defaultExchange()).with(MQConstant.DELAY_QUEUE);
    }
}

五、创建消费者

//监听普通队列,有消息时进行消费
@Component
@RabbitListener(queues = MQConstant.HELLO_QUEUE)
@Slf4j
public class ReceiverMessage {

    @RabbitHandler
    public void process(String content) {
        log.info("接受时间:"+ new Date());
        log.info("接受消息:" + content);
    }
}

//监听转发队列,有消息时,把消息转发到目标队列
@Component
@RequiredArgsConstructor
public class ReceiverDelayMessage {

    private final MessageService messageService;

    @RabbitHandler
    @RabbitListener(queues = MQConstant.TRANSMIT_QUEUE)
    public void process(String content) {
        //此时,才把消息发送到指定队列,而实现延迟功能
        TransmitMessage message = JSON.parseObject(content, TransmitMessage.class);
        messageService.send(message.getQueueName(), message.getContent());
    }

}


/**
 * 转发消息载体
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TransmitMessage implements Serializable {

    @Serial
    private static final long serialVersionUID = 1L;

    private String exchange;

    private String queueName;

    private String content;

    private long times;

}

六、消息发送实现

@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService{

    private final RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName, String message) {
        rabbitTemplate.convertAndSend(MQConstant.EXCHANGE,queueName, message);
    }

    /**
     * 延迟发送消息到队列
     * @param queueName 队列名称
     * @param message 消息内容
     */
    public void send(String queueName, String message, long times) {
        //消息发送到延迟队列,当消息超时时,会路由到转发队列,转发队列根据下面封装的queueName,把消息转发到目的地队列
        //发送前,把消息进行封装,转发时应转发到指定 queueName
        TransmitMessage transmitMessage = new TransmitMessage(MQConstant.EXCHANGE, queueName, message, times);
        MessagePostProcessor processor = new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration(times + "");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(MQConstant.EXCHANGE,MQConstant.DELAY_QUEUE, JSON.toJSONString(transmitMessage),processor);
    }

}


/**
 * Rabbit消息队列相关常量
 */
public final class MQConstant {

    private MQConstant(){}

    // 交换机
    public static final String EXCHANGE = "exchange";

    // 延迟队列
    public static final String DELAY_QUEUE = "delayQueue";

    // 转发队列
    public static final String TRANSMIT_QUEUE = "transmitQueue";

    // 测试队列
    public static final String HELLO_QUEUE = "hello";

}

七、延迟消息测试

@RequiredArgsConstructor
@Slf4j
@RestController
public class TestController {

    private final MessageService messageService;

    private final RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public void send() {
        log.info("发送时间: " + new Date());
        messageService.send(MQConstant.HELLO_QUEUE, "这个有点复杂哦...");

        messageService.send(MQConstant.HELLO_QUEUE, "这个有点复杂哦...我是不是已经说过这句话了??", 5*1000);
    }

}

RabbitMQ延迟插件

一、安装延迟插件

下载插件: Releases · rabbitmq/rabbitmq-delayed-message-exchange (github.com)

安装插件有两种方法

  1. 下载插件后自行上传到服务器并复制到RabbitMQ容器的Plugin文件夹下,比较复杂这里不赘述

  2. 使用dockerfile做一个镜像,在构建时直接把插件包含进去

下载好插件后上传至linux,同文件夹下在创建一个dockerfile(注意修改版本)

FROM rabbitmq:3.13.0-management
COPY ["rabbitmq_delayed_message_exchange-3.13.0.ez" , "/plugins/"]
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在此文件夹执行命令构建镜像并启动容器

docker build -t rabbitmq:3.8.2-management .

docker run -it -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 -v /mydata/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin123456 rabbitmq:3.13.0-management

访问15627端口后台页面,发现有这个type就证明延迟插件安装成功了

二、创建延迟交换机、队列

    /**
     * 以下是延迟插件相关配置
     */
    @Bean
    public CustomExchange  delayPluginExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange (MQConstant.DELAY_EXCHANGE, "x-delayed-message",true, false, args);
    }
    //转发队列,中转站,将消息发送到真正的目的地
    @Bean
    public Queue transmitQueue() {
        return new Queue(MQConstant.TRANSMIT_QUEUE, true, false, false);
    }
   @Bean
    public Binding delayPluginQueueBinding() {
        return BindingBuilder.bind(transmitQueue()).to(delayPluginExchange()).with(MQConstant.TRANSMIT_QUEUE).noargs();
    }

三、创建消费者并定义发送消息方法

@Component
@RequiredArgsConstructor
@Slf4j
public class ReceiverDelayPluginMessage {
    @RabbitHandler
    @RabbitListener(queues = MQConstant.DELAY_PLUGIN_QUEUE)
    public void pluginProcess(String content) {
        log.info("接受时间:"+ new Date());
        log.info("接受消息:" + content);
    }
}


@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService{

    private final RabbitTemplate rabbitTemplate;

    /**
     * 发送消息到延迟交换机
     * @param message 消息内容
     * @param times 延迟时间
     */
    public void sendToDelayExchange(String queueName, String message, long times) {
        // 消息直接发给延迟交换机,过期后推到转发队列,转发队列解析封装的TransmitMessage,将消息发到真正的目的地
		// 多使用一层转发队列是为了灵活性,避免只能写死发到一个队列
        TransmitMessage transmitMessage = new TransmitMessage(MQConstant.EXCHANGE, queueName, message, times);
        rabbitTemplate.convertAndSend(MQConstant.DELAY_EXCHANGE, MQConstant.TRANSMIT_QUEUE, JSON.toJSONString(transmitMessage), processor -> {
            // 给每条消息设置过期时间
            processor.getMessageProperties().setDelayLong(times);
            return processor;
        });
    }

}


/**
 * 转发消息载体
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TransmitMessage implements Serializable {

    @Serial
    private static final long serialVersionUID = 1L;

    private String exchange;

    private String queueName;

    private String content;

    private long times;

}


/**
 * Rabbit消息队列相关常量
 */
public final class MQConstant {

    private MQConstant(){}

    // 交换机
    public static final String EXCHANGE = "exchange";

    // 延迟交换机
    public static final String DELAY_EXCHANGE = "delayExchange";

    // 延迟队列
    public static final String DELAY_QUEUE = "delayQueue";

    // 转发队列
    public static final String TRANSMIT_QUEUE = "transmitQueue";

    // 测试队列
    public static final String HELLO_QUEUE = "hello";

    // 延迟插件队列
    public static final String DELAY_PLUGIN_QUEUE = "delayPluginQueue";

}

四、测试延迟消息

    @GetMapping("/send")
    public void send() {
        log.info("发送时间: " + new Date());

        messageService.sendToDelayExchange("延迟插件还是好用啊~", 5*1000);
    }

文章作者: 像柔风
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 像柔风的个人博客
默认分类 消息队列 Redis 电商
喜欢就支持一下吧