RabbitMQ - 延迟队列
# 延迟队列介绍
- 延迟队列概念:
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
- 延迟队列使用场景:
1.订单在十分钟之内未支付则自动取消 2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。 3.用户注册成功后,如果三天内没有登陆则进行短信提醒。 4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。 5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:"订单十分钟内未支付则关闭",短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
# RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
- 队列设置TTL
在创建队列的时候设置队列的“x-message-ttl”属性
- 消息设置TTL
是针对每条消息设置TTL
两者的区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
# 整合 springboot
前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面, 成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
1、添加依赖:
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
3、修改配置文件
spring:
rabbitmq:
host: 211.69.238.77
port: 5672
username: admin
password: 123456
2
3
4
5
6
4、添加Swagger 配置类
package com.zdk.rabbitmqspringboot.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* @author zdk
* @date 2022/5/13 19:29
*/
@Configuration
@EnableSwagger2
public class Swagger2Config {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("RabbitMQ接口文档")
.description("RabbitMQ微服务接口定义")
.version("1.0")
.contact(new Contact("zdk", "https://github.com/hnistzdk", "369365576@qq.com"))
.build();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# 队列 TTL
- 代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:
这里要注意,两个普通队列与死信交换机的routingKey要和 死信队列与死信交换机的routingKey相同!
原先配置队列信息,写在了生产者和消费者代码中,现在可写在配置类中,生产者只发消息,消费者只接受消息
1、配置类 :
package com.zdk.rabbitmqspringboot.config;
import com.zdk.rabbitmqspringboot.constant.TtlConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zdk
* @date 2022/5/13 20:06
* 1.一个生产者,向普通交换机投递消息
* 2.一个direct类型的普通交换机,它绑定两个普通队列,一个消息ttl为10s,一个为40s
* 3.然后再来一个type为direct的死信交换机,它绑定一个普通队列,
* 用于接收从上面两个普通队列消息过期后来的死信,
* 4.再来一个死信消费者
* 5.注意 两个普通队列与死信队列 这三个队列与死信交换机的routingKey应该是一样的
* 否则普通队列中过期的消息路由不到死信队列中
*/
@Configuration
public class TtlQueueConfig {
/**
* 声明普通交换机
* @return
*/
@Bean
public DirectExchange simpleExchange(){
return ExchangeBuilder
.directExchange(TtlConstant.SIMPLE_EXCHANGE)
.durable(true)
.build();
}
/**
* 声明死信交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return ExchangeBuilder
.directExchange(TtlConstant.DEAD_EXCHANGE)
.durable(true)
.build();
}
/**
* 普通队列1 ttl 10s
* @return
*/
@Bean
public Queue simpleQueue01(){
return QueueBuilder.durable(TtlConstant.SIMPLE_QUEUE01)
//指定这个队列的死信交换机
.deadLetterExchange(TtlConstant.DEAD_EXCHANGE)
//指定与死信交换机的routingKey
.deadLetterRoutingKey(TtlConstant.DEAD_EXCHANGE_ROUTING_KEY)
//设置消息过期时间
.ttl(10000)
.build();
}
/**
* 普通队列2 ttl 40s
* @return
*/
@Bean
public Queue simpleQueue02(){
return QueueBuilder.durable(TtlConstant.SIMPLE_QUEUE02)
//绑定这个队列的死信交换机
.deadLetterExchange(TtlConstant.DEAD_EXCHANGE)
//绑定与死信交换机的routingKey
.deadLetterRoutingKey(TtlConstant.DEAD_EXCHANGE_ROUTING_KEY)
//设置消息过期时间
.ttl(40000)
.build();
}
/**
* 死信队列:接收满足routingKey的死信交换机的消息
* @return
*/
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(TtlConstant.DEAD_QUEUE).build();
}
/**
* 普通队列1绑定到普通交换机
* @param simpleQueue01
* @param simpleExchange
* @return
*/
@Bean
public Binding simpleQueue01BindSimpleExchange(
@Qualifier("simpleQueue01") Queue simpleQueue01,
@Qualifier("simpleExchange") DirectExchange simpleExchange
){
return BindingBuilder
//队列
.bind(simpleQueue01)
//到交换机
.to(simpleExchange)
//routingKey
.with(TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_01);
}
/**
* 普通队列2绑定到普通交换机
* @param simpleQueue02
* @param simpleExchange
* @return
*/
@Bean
public Binding simpleQueue02BindSimpleExchange(
@Qualifier("simpleQueue02") Queue simpleQueue02,
@Qualifier("simpleExchange") DirectExchange simpleExchange
){
return BindingBuilder
//队列
.bind(simpleQueue02)
//到交换机
.to(simpleExchange)
//routingKey
.with(TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_02);
}
/**
* 死信队列绑定到死信交换机
* @param deadQueue
* @param deadExchange
* @return
*/
@Bean
public Binding deadQueueBindDeadExchange(
@Qualifier("deadQueue") Queue deadQueue,
@Qualifier("deadExchange") DirectExchange deadExchange
){
return BindingBuilder
//队列
.bind(deadQueue)
//到交换机
.to(deadExchange)
//routingKey
.with(TtlConstant.DEAD_EXCHANGE_ROUTING_KEY);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
2、消息生产者代码
package com.zdk.rabbitmqspringboot.controllers;
import cn.hutool.core.date.DateUtil;
import com.zdk.rabbitmqspringboot.constant.TtlConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author zdk
* @date 2022/5/13 20:45
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", DateUtil.now(), message);
rabbitTemplate.convertAndSend(TtlConstant.SIMPLE_EXCHANGE, TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_01, "消息来自 ttl 为 10S 的队列: "+message);
rabbitTemplate.convertAndSend(TtlConstant.SIMPLE_EXCHANGE, TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_02, "消息来自 ttl 为 40S 的队列: "+message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
3、消息消费者代码
package com.zdk.rabbitmqspringboot.consumer;
import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.Channel;
import com.zdk.rabbitmqspringboot.constant.TtlConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author zdk
* @date 2022/5/13 21:16
*/
@Slf4j
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = TtlConstant.DEAD_QUEUE)
public void receive(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", DateUtil.now(), msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
发起一个请求 http://localhost:8082/ttl/sendMsg/撒大大 (opens new window)
第一条消息在 10S 后变成了死信消息,然后被死信消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
# 延时队列TTL优化
在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间
配置文件类代码新增一个普通不设置ttl的队列,为其绑定死信交换机和普通交换机
@Configuration
public class MsgTtlQueueConfig {
/**
* 不设置消息过期时间的普通队列
* ttl由发送方指定
* @return
*/
@Bean
public Queue noTtlSimpleQueue(){
return QueueBuilder
.durable(TtlConstant.NO_TTL_SIMPLE_QUEUE)
.deadLetterExchange(TtlConstant.DEAD_EXCHANGE)
.deadLetterRoutingKey(TtlConstant.DEAD_EXCHANGE_ROUTING_KEY)
.build();
}
/**
* 绑定到普通交换机上
* @param noTtlSimpleQueue
* @param simpleExchange
* @return
*/
@Bean
public Binding noTtlSimpleQueueBindSimpleExchange(
@Qualifier("noTtlSimpleQueue") Queue noTtlSimpleQueue,
@Qualifier("simpleExchange") DirectExchange simpleExchange
){
return BindingBuilder.bind(noTtlSimpleQueue)
.to(simpleExchange)
.with(TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_03);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
生产者代码新增一个按给定时间参数设置消息的ttl的接口
@ApiOperation(value = "发送指定过期时间的消息")
@GetMapping("/sendTtlMsg/{message}/{ttl}")
public void sendTtlMsg(@PathVariable String message, @PathVariable String ttl){
log.info("当前时间:{},发送一条指定过期时间的信息给普通队列:{}", DateUtil.now(), message);
rabbitTemplate.convertAndSend(TtlConstant.SIMPLE_EXCHANGE, TtlConstant.SIMPLE_QUEUE_ROUTING_KEY_03, message,correlationData->{
//发送方设置消息过期时间
correlationData.getMessageProperties().setExpiration(ttl);
return correlationData;
}
);
}
2
3
4
5
6
7
8
9
10
11
发起请求
http://localhost:8082/ttl/sendTtlMsg/略略略1/9000 (opens new window)
http://localhost:8082/ttl/sendTtlMsg/略略略2/1000 (opens new window)
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,
如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这也就是为什么第二个延时1秒,却后执行
。
# Rabbitmq 插件实现延迟队列
上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
安装延时队列插件
可去我的腾讯微云 (opens new window) 下载rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录。
目录在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.11/plugins位置
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
#先通过ftp上传到这个目录下
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.11/plugins
#安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#重启服务
systemctl restart rabbitmq-server
2
3
4
5
6
基于插件
在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
这里从上面的,由发送方指定时间到死信队列,变为了通过这个支持延迟的交换机来对消息进行延迟
1、配置文件类代码:
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
package com.zdk.rabbitmqspringboot.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author zdk
* @date 2022/5/14 14:55
* 使用插件实现延时队列
*/
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
/**
* 声明自定义交换机
* @return
*/
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> args = new HashMap<>();
//指定延时交换机的类型 为direct
//这里要这样写BuiltinExchangeType.DIRECT.getType()
//因为这个的value必须要String
args.put("x-delayed-type", BuiltinExchangeType.DIRECT.getType());
//构造函数的参数里的type 是声明这个交换机的功能的类型 是一个延迟消息的类型
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,args);
}
/**
* 接收延时消息的队列
* @return
*/
@Bean
public Queue delayedQueue(){
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
/**
* 绑定延时交换机和队列
* @param delayedQueue
* @param delayedExchange
* @return
*/
@Bean
public Binding bindingDelayed(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange
){
return BindingBuilder
.bind(delayedQueue)
.to(delayedExchange)
.with(DELAYED_ROUTING_KEY)
.noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
2、生产者代码
@ApiOperation(value = "发送指定时间的消息到延迟队列")
@GetMapping("/sendMsg2DelayedExchange/{message}/{ttl}")
public void sendMsg2DelayedExchange(@PathVariable String message, @PathVariable Integer ttl){
log.info("当前时间:{},发送一条时长为 {} 的消息给延迟队列: {}", DateUtil.now(),ttl, message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg->{
//发送方设置消息延迟时间
msg.getMessageProperties().setDelay(ttl);
return msg;
}
);
}
2
3
4
5
6
7
8
9
10
11
12
3、消费者代码
/**
* 延时队列监听
* @param message
* @param channel
*/
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void delayReceive(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", DateUtil.now(), msg);
}
2
3
4
5
6
7
8
9
10
发送请求:
- http://localhost:8082/ttl//sendMsg2DelayedExchange/消息1/20000 (opens new window)
- http://localhost:8082/ttl//sendMsg2DelayedExchange/消息2/5000 (opens new window)
第二个消息被先消费掉了,符合预期
# 总结
笔记
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景