消息丢失
Table of Contents generated with DocToc (opens new window)
# RabbitMQ
# 两种机制
提示
RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签收机制,所以有必要把这两个概念搞清楚
# 1. 消息确认机制(producer)
主要是生产者使用的机制,用来确认消息是否被成功消费
配置如下:
spring:
rabbitmq:
#设置消息确认的模式 发布消息成功到交换器后会触发回调方法
publisher-confirm-type: correlated
2
3
4
提示
实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,就可以针对性地进行消息确认的日志记录,之后做进一步的消息发送补偿,以达到接近100%投递的目的。
/**
* @author zdk
* @date 2022/5/14 18:15
* 消息回调
*/
@Slf4j
@Component
public class MessageCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
/**
* 消息发送到交换机的回调
* @param correlationData 消息相关数据
* @param ack 消息是否被交换机确认了
* @param cause 没被确认的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if (ack) {
log.info("交换机已收到 id 为:{}的消息",id);
}else{
log.info("交换机未收到 id 为:{}的消息,原因是:{}",id,cause);
}
}
/**
* 消息未投递到队列的回调
* @param returned 投递失败的消息和元数据
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
提示
这里其实对应了两种消息的丢失情况
第一个回调是处理:生产者发送消息到交换机(RabbitMQ Server)的时候因为网络或者其他原因导致的消息丢失
第二个回调是处理:交换机已收到生产者发送的消息,现在交换机需要把消息投递到对应的routingKey的队列中去,但实质上不存在满足条件的队列,消息投递就会失败,而如果我们仅开启生产者确认机制的情况下,交换机接收到消息后,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,所以需要告诉生产者消息投递队列失败。
# 2. 消息签收机制(consumer)
提示
RabbitMQ的消息是自动签收的,可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。
企业级开发中,RabbitMQ基本都需要开启手动签收方式,这样可以有效避免消息的丢失。
前面已经在生产者开启了手动签收机制,那么作为消费方,也要设置手动签收
spring:
rabbitmq:
host: 211.69.238.77
port: 5672
username: admin
password: 123456
listener:
simple:
acknowledge-mode: manual #开启手动签收
2
3
4
5
6
7
8
9
消费监听时,手动签收就一行代码,伪代码如下:
@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
// ....
// 手动签收
channel.basicAck(tag, false);
}
2
3
4
5
6
7
# 消息丢失的三种情况和解决方式
# 1. 生产者发送消息到交换机丢失
提示
解决方式:在生产者端开启comfirm 确认模式,每次写的消息都会分配一个唯一的 id,
然后如果写入了 RabbitMQ 中,RabbitMQ 会给回传一个 ack 消息
# 2. MQ收到消息,暂存内存中,还没进行消费,MQ服务挂掉,消息丢失
提示
解决方式:MQ设置为持久化。将内存数据持久化到磁盘中
# 3. MQ收到消息,消费者收到消息但还未处理,服务挂掉了,而消息是自动签收的,所以造成丢失
提示
解决方式:开启消费者端的手动确认机制,在代码中手动调用api进行消费成功的确认