超时关单 - 别再用Redis过期监听了!
超时关单方案选择
在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内,那他们是怎么实现的呢?对此我查到了不少解决方案,其中有:
Redis过期消息监听
Redisson的DelayQueue
RabbitMQ死信队列
RabbitMQ延迟插件
这四个方案中,十分不推荐的是第一个Redis过期消息监听 ,因为redis 不能保证会在过期后立即删除过期键并发送通知,稍微有点数据量就会导致监听延后非常严重,这里可以参考:请勿过度依赖Redis的过期监听 - 掘金 (juejin.cn)
第二不推荐的是RabbitMQ死信队列,它被设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。和Redis过期消息监听一样被用于做延迟任务都是作者意料之外的骚操作。这个方案的缺点是死信队列是先进先出的,第一条消息没有过期,后面的消息即使过期了也不会被处理。但我们的超时关单过期时间明显是固定的,只需要创建一个队列专门处理超时关单的消息即可。
然而Redisson的DelayQueue和RabbitMQ延迟插件也不是万能的,前者需要注意数据的持久化,保证数据不丢失。后者需要注意控制队列中消息数量不能过多(10W+),否则会占用大量CPU资源。
Redisson实现延迟消息
一、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson的版本自行匹配,此处SpringBoot3.2.4 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.27.2</version>
</dependency>
二、配置Redis
## SpringBoot2.x及以下是spring.redis, 3.x才是spring.data.redis
spring:
data:
redis:
host: 127.0.0.1
port: 6379
password: admin
三、Redisson实现延迟队列
@Service
@RequiredArgsConstructor
@Slf4j
public class RedisMessageService {
private final RedissonClient redisson;
private RBlockingQueue<String> blockingQueue;
private RDelayedQueue<String> delayedQueue;
/**
* 依赖加载完毕后进行初始化
*/
@PostConstruct
public void init() {
blockingQueue = redisson.getBlockingQueue("blockingQueue");
delayedQueue = redisson.getDelayedQueue(blockingQueue);
// 开启一个线程持续监听BlockingQueue,队列为空则阻塞等待
new Thread(() -> {
log.info("start listen delayedQueue...");
while(true) {
try {
//延迟队列有数据就返回,否则wait
log.info("接收时间:" + new Date());
log.info("接收消息: " + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
/**
* 向延迟队列发消息
* @param message message
*/
public void sendMessage(String message, Long delayTime) {
delayedQueue.offer(message, delayTime, TimeUnit.SECONDS);
}
}
四、测试延迟消息
@RestController
@RequiredArgsConstructor
@Slf4j
public class RedisTestController {
private final RedisMessageService redisMessageService;
@GetMapping("/redis/send")
public void send(){
log.info("发送时间: " + new Date());
redisMessageService.sendMessage("这就完了?就是这么简单!", 10L);
}
}
Redisson封装的组件就是这么简单,具体原理本文不过多赘述。
RabbitMQ死信队列实现延迟消息
一、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.9.graal</version>
</dependency>
二、配置RabbitMQ
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
三、配置消息序列化方式,便于收发json数据
@Configuration
@Slf4j
@RequiredArgsConstructor
public class RabbitMQConfiguration {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
//默认是用jdk序列化
//数据转换为json存入消息队列,方便可视化界面查看消息数据
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}
四、创建 交换机、队列,并绑定关系(重要)
扫盲:
我这里创建了三个队列
queue 接收普通消息
delayQueue 接收需要延迟的消息,到期后根据死信配置发送到其他队列
transmitQueue 接收delayQueue 中过期的消息,作为中转站将消息发往真正的目的地(可以直接配置delayQueue 过期发到queue ,为了更灵活所以加一个中转站)
/**
* 创建交换机
* 创建队列并绑定到交换机上
*/
@Configuration
public class QueueConfiguration {
// 创建交换机
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(MQConstant.EXCHANGE, true, false);
}
// 创建测试队列
@Bean
public Queue queue() {
return new Queue(MQConstant.HELLO_QUEUE, true, false, false);
}
// 绑定测试队列
@Bean
public Binding binding() {
//队列绑定到exchange上,再绑定好路由键
return BindingBuilder.bind(queue()).to(defaultExchange()).with(MQConstant.HELLO_QUEUE);
}
//转发队列
@Bean
public Queue transmitQueue() {
return new Queue(MQConstant.TRANSMIT_QUEUE, true, false, false);
}
//绑定转发队列
@Bean
public Binding transmitQueueBinding() {
return BindingBuilder.bind(transmitQueue()).to(defaultExchange()).with(MQConstant.TRANSMIT_QUEUE);
}
// 延迟队列, 需要延迟的消息发送这里,时间耗尽后自动路由到转发队列
@Bean
public Queue delayQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", MQConstant.EXCHANGE); // 指定死信交换机
arguments.put("x-dead-letter-routing-key", MQConstant.TRANSMIT_QUEUE); // 指定死信路由的key
return new Queue(MQConstant.DELAY_QUEUE, true, false, false, arguments);
}
// 绑定延迟队列
@Bean
public Binding delayQueueBinding() {
return BindingBuilder.bind(delayQueue()).to(defaultExchange()).with(MQConstant.DELAY_QUEUE);
}
}
五、创建消费者
//监听普通队列,有消息时进行消费
@Component
@RabbitListener(queues = MQConstant.HELLO_QUEUE)
@Slf4j
public class ReceiverMessage {
@RabbitHandler
public void process(String content) {
log.info("接受时间:"+ new Date());
log.info("接受消息:" + content);
}
}
//监听转发队列,有消息时,把消息转发到目标队列
@Component
@RequiredArgsConstructor
public class ReceiverDelayMessage {
private final MessageService messageService;
@RabbitHandler
@RabbitListener(queues = MQConstant.TRANSMIT_QUEUE)
public void process(String content) {
//此时,才把消息发送到指定队列,而实现延迟功能
TransmitMessage message = JSON.parseObject(content, TransmitMessage.class);
messageService.send(message.getQueueName(), message.getContent());
}
}
/**
* 转发消息载体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TransmitMessage implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
private String exchange;
private String queueName;
private String content;
private long times;
}
六、消息发送实现
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService{
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息到队列
* @param queueName 队列名称
* @param message 消息内容
*/
public void send(String queueName, String message) {
rabbitTemplate.convertAndSend(MQConstant.EXCHANGE,queueName, message);
}
/**
* 延迟发送消息到队列
* @param queueName 队列名称
* @param message 消息内容
*/
public void send(String queueName, String message, long times) {
//消息发送到延迟队列,当消息超时时,会路由到转发队列,转发队列根据下面封装的queueName,把消息转发到目的地队列
//发送前,把消息进行封装,转发时应转发到指定 queueName
TransmitMessage transmitMessage = new TransmitMessage(MQConstant.EXCHANGE, queueName, message, times);
MessagePostProcessor processor = new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration(times + "");
return message;
}
};
rabbitTemplate.convertAndSend(MQConstant.EXCHANGE,MQConstant.DELAY_QUEUE, JSON.toJSONString(transmitMessage),processor);
}
}
/**
* Rabbit消息队列相关常量
*/
public final class MQConstant {
private MQConstant(){}
// 交换机
public static final String EXCHANGE = "exchange";
// 延迟队列
public static final String DELAY_QUEUE = "delayQueue";
// 转发队列
public static final String TRANSMIT_QUEUE = "transmitQueue";
// 测试队列
public static final String HELLO_QUEUE = "hello";
}
七、延迟消息测试
@RequiredArgsConstructor
@Slf4j
@RestController
public class TestController {
private final MessageService messageService;
private final RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send() {
log.info("发送时间: " + new Date());
messageService.send(MQConstant.HELLO_QUEUE, "这个有点复杂哦...");
messageService.send(MQConstant.HELLO_QUEUE, "这个有点复杂哦...我是不是已经说过这句话了??", 5*1000);
}
}
RabbitMQ延迟插件
一、安装延迟插件
下载插件: Releases · rabbitmq/rabbitmq-delayed-message-exchange (github.com)
安装插件有两种方法
下载插件后自行上传到服务器并复制到RabbitMQ容器的Plugin文件夹下,比较复杂这里不赘述
使用dockerfile做一个镜像,在构建时直接把插件包含进去
下载好插件后上传至linux,同文件夹下在创建一个dockerfile(注意修改版本)
FROM rabbitmq:3.13.0-management
COPY ["rabbitmq_delayed_message_exchange-3.13.0.ez" , "/plugins/"]
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在此文件夹执行命令构建镜像并启动容器
docker build -t rabbitmq:3.8.2-management .
docker run -it -d --hostname my-rabbit --name rabbitmq -p 15672:15672 -p 5672:5672 -v /mydata/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin123456 rabbitmq:3.13.0-management
访问15627端口后台页面,发现有这个type就证明延迟插件安装成功了
二、创建延迟交换机、队列
/**
* 以下是延迟插件相关配置
*/
@Bean
public CustomExchange delayPluginExchange(){
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange (MQConstant.DELAY_EXCHANGE, "x-delayed-message",true, false, args);
}
//转发队列,中转站,将消息发送到真正的目的地
@Bean
public Queue transmitQueue() {
return new Queue(MQConstant.TRANSMIT_QUEUE, true, false, false);
}
@Bean
public Binding delayPluginQueueBinding() {
return BindingBuilder.bind(transmitQueue()).to(delayPluginExchange()).with(MQConstant.TRANSMIT_QUEUE).noargs();
}
三、创建消费者并定义发送消息方法
@Component
@RequiredArgsConstructor
@Slf4j
public class ReceiverDelayPluginMessage {
@RabbitHandler
@RabbitListener(queues = MQConstant.DELAY_PLUGIN_QUEUE)
public void pluginProcess(String content) {
log.info("接受时间:"+ new Date());
log.info("接受消息:" + content);
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageService{
private final RabbitTemplate rabbitTemplate;
/**
* 发送消息到延迟交换机
* @param message 消息内容
* @param times 延迟时间
*/
public void sendToDelayExchange(String queueName, String message, long times) {
// 消息直接发给延迟交换机,过期后推到转发队列,转发队列解析封装的TransmitMessage,将消息发到真正的目的地
// 多使用一层转发队列是为了灵活性,避免只能写死发到一个队列
TransmitMessage transmitMessage = new TransmitMessage(MQConstant.EXCHANGE, queueName, message, times);
rabbitTemplate.convertAndSend(MQConstant.DELAY_EXCHANGE, MQConstant.TRANSMIT_QUEUE, JSON.toJSONString(transmitMessage), processor -> {
// 给每条消息设置过期时间
processor.getMessageProperties().setDelayLong(times);
return processor;
});
}
}
/**
* 转发消息载体
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TransmitMessage implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
private String exchange;
private String queueName;
private String content;
private long times;
}
/**
* Rabbit消息队列相关常量
*/
public final class MQConstant {
private MQConstant(){}
// 交换机
public static final String EXCHANGE = "exchange";
// 延迟交换机
public static final String DELAY_EXCHANGE = "delayExchange";
// 延迟队列
public static final String DELAY_QUEUE = "delayQueue";
// 转发队列
public static final String TRANSMIT_QUEUE = "transmitQueue";
// 测试队列
public static final String HELLO_QUEUE = "hello";
// 延迟插件队列
public static final String DELAY_PLUGIN_QUEUE = "delayPluginQueue";
}
四、测试延迟消息
@GetMapping("/send")
public void send() {
log.info("发送时间: " + new Date());
messageService.sendToDelayExchange("延迟插件还是好用啊~", 5*1000);
}