RabbitMQ - 死信队列
# 死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
# 死信的来源
消息 TTL 过期
TTL是Time To Live的缩写, 也就是生存时间
队列达到最大长度
队列满了,无法再添加数据到 mq 中
消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue=false.
# 死信实战
# 死信之TTl
生产者代码:
package com.zdk.deadQueue;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.zdk.utils.RabbitMQUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
/**
* @author zdk
* @date 2022/5/7 20:27
* 生产者
*/
public class Producer {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static final String NORMAL_ROUTING_KEY = "normal";
public static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
//声明普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
/*
普通队列将消息在三种情况下转到死信队列需要进行以下参数配置
*/
//为普通队列声明参数
Map<String, Object> arguments = new HashMap<>();
//指定它的死信队列
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//指定死信队列与死信交换机的routingKey
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
//声明普通队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
System.out.println("生产者准备发送消息......");
Scanner cin = new Scanner(System.in);
/*
* 1.情况一:消息处理超时从而进入死信队列
* 构建参数 消费者将消息过期时间设置为10秒
* 如果消息超过10秒没被确认消费,那么就会从普通队列移除 并加入到死信队列中
*/
AMQP.BasicProperties props =
new AMQP.BasicProperties()
.builder()
.expiration("10000")
.build();
while (cin.hasNext()) {
String message = cin.next();
//发送消息
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, props, message.getBytes(StandardCharsets.UTF_8));
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
普通消费者代码
package com.zdk.deadQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zdk.utils.RabbitMQUtils;
/**
* @author zdk
* @date 2022/5/7 20:26
* 普通队列消费者
*/
public class NormalQueueConsumer {
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println("普通队列消费者消费了消息:"+msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
System.out.println("普通队列消费者准备接收消息......");
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
启动生产者后,不启动普通队列消费者,然后发送消息,来模拟其接收不到消息导致消息处理超时的情况
这是初始情况:
发送两条消息后:
等待10s后:
死信消费者代码:
以上步骤完成后,启动死信消费者,它消费死信队列里面的消息
package com.zdk.deadQueue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zdk.utils.RabbitMQUtils;
/**
* @author zdk
* @date 2022/5/7 20:27
* 死信队列消费者
*/
public class DeadQueueConsumer {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
System.out.println("死信队列消费者消费了消息:"+msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
System.out.println("死信队列消费者准备接收消息......");
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> {});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 死信之最大长度
1、消息生产者代码去掉 TTL 属性
2、C1 消费者修改以下代码**(启动之后关闭该消费者 模拟其接收不到消息)**
//设置正常队列的长度限制,例如发5个,3个则为死信
arguments.put("x-max-length", 2);
1
2
2
注意此时需要把原先队列删除 因为参数改变了
3、死信队列消费者代码不变,启动它
# 死信之消息被拒
1、生产者代码同上不用修改
2、普通队列消费者修改一下 拒收消息 "garbage"
public class NormalQueueConsumer {
public static final String NORMAL_QUEUE = "normal_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
String msg = new String(message.getBody());
if (msg.equals("garbage")){
System.out.println("普通队列消费者拒收了消息:"+msg);
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else{
System.out.println("普通队列消费者消费了消息:"+msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
System.out.println("普通队列消费者准备接收消息......");
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
发送一个"正常"
,一个"garbage"
后
然后启动死信队列消费者:
在 GitHub 上编辑此页 (opens new window)
最后更新: 2022/09/16, 20:09:00