风在路上 风在路上
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)

风

摸鱼
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringMvc

  • RabbitMQ

    • RabbitMQ - 知识体系
    • 中间件介绍
    • 消息队列介绍
    • RabbitMQ - 安装
    • RabbitMQ - 简单案例
    • RabbitMQ - 发布确认
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
    • RabbitMQ - 发布确认高级-不可路由消息处理
    • RabbitMQ - 幂等性、优先级、惰性
    • 消息队列基础
    • 消息丢失
      • RabbitMQ
        • 两种机制
        • 1. 消息确认机制(producer)
        • 2. 消息签收机制(consumer)
        • 消息丢失的三种情况和解决方式
        • 1. 生产者发送消息到交换机丢失
        • 2. MQ收到消息,暂存内存中,还没进行消费,MQ服务挂掉,消息丢失
        • 3. MQ收到消息,消费者收到消息但还未处理,服务挂掉了,而消息是自动签收的,所以造成丢失
    • 重复消费
    • 顺序消费
  • Dubbo

  • SpringCloud

  • 框架
  • RabbitMQ
zdk
2022-09-16
目录

消息丢失

Table of Contents generated with DocToc (opens new window)

  • RabbitMQ
    • 两种机制
      • 1. 消息确认机制(producer)
      • 2. 消息签收机制(consumer)
    • 消息丢失的三种情况和解决方式
      • 1. 生产者发送消息到交换机丢失
      • 2. MQ收到消息,暂存内存中,还没进行消费,MQ服务挂掉,消息丢失
      • 3. MQ收到消息,消费者收到消息但还未处理,服务挂掉了,而消息是自动签收的,所以造成丢失

# RabbitMQ

# 两种机制

提示

RabbitMQ避免消息丢失的方法主要是利用消息确认机制和手动签收机制,所以有必要把这两个概念搞清楚

# 1. 消息确认机制(producer)

主要是生产者使用的机制,用来确认消息是否被成功消费


配置如下:

spring:
  rabbitmq:
    #设置消息确认的模式  发布消息成功到交换器后会触发回调方法
    publisher-confirm-type: correlated
1
2
3
4

提示

实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,就可以针对性地进行消息确认的日志记录,之后做进一步的消息发送补偿,以达到接近100%投递的目的。

/**
 * @author zdk
 * @date 2022/5/14 18:15
 * 消息回调
 */
@Slf4j
@Component
public class MessageCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    /**
     * 消息发送到交换机的回调
     * @param correlationData 消息相关数据
     * @param ack 消息是否被交换机确认了
     * @param cause 没被确认的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if (ack) {
            log.info("交换机已收到 id 为:{}的消息",id);
        }else{
            log.info("交换机未收到 id 为:{}的消息,原因是:{}",id,cause);
        }
    }

    /**
     * 消息未投递到队列的回调
     * @param returned 投递失败的消息和元数据
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {

    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

提示

这里其实对应了两种消息的丢失情况


第一个回调是处理:生产者发送消息到交换机(RabbitMQ Server)的时候因为网络或者其他原因导致的消息丢失


第二个回调是处理:交换机已收到生产者发送的消息,现在交换机需要把消息投递到对应的routingKey的队列中去,但实质上不存在满足条件的队列,消息投递就会失败,而如果我们仅开启生产者确认机制的情况下,交换机接收到消息后,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,所以需要告诉生产者消息投递队列失败。

# 2. 消息签收机制(consumer)

提示

RabbitMQ的消息是自动签收的,可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。


企业级开发中,RabbitMQ基本都需要开启手动签收方式,这样可以有效避免消息的丢失。


前面已经在生产者开启了手动签收机制,那么作为消费方,也要设置手动签收

spring:
  rabbitmq:
    host: 211.69.238.77
    port: 5672
    username: admin
    password: 123456
    listener:
      simple:
        acknowledge-mode: manual #开启手动签收
1
2
3
4
5
6
7
8
9

消费监听时,手动签收就一行代码,伪代码如下:

@RabbitListener(xxx)
public void onOrderMessage(@Payload Order order, Channel channel, 
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
    // ....
    // 手动签收
    channel.basicAck(tag, false);
}
1
2
3
4
5
6
7

# 消息丢失的三种情况和解决方式

# 1. 生产者发送消息到交换机丢失

提示

解决方式:在生产者端开启comfirm 确认模式,每次写的消息都会分配一个唯一的 id,


然后如果写入了 RabbitMQ 中,RabbitMQ 会给回传一个 ack 消息

# 2. MQ收到消息,暂存内存中,还没进行消费,MQ服务挂掉,消息丢失

提示

解决方式:MQ设置为持久化。将内存数据持久化到磁盘中

# 3. MQ收到消息,消费者收到消息但还未处理,服务挂掉了,而消息是自动签收的,所以造成丢失

提示

解决方式:开启消费者端的手动确认机制,在代码中手动调用api进行消费成功的确认

在 GitHub 上编辑此页 (opens new window)
#消息队列
最后更新: 2022/10/04, 16:10:00
消息队列基础
重复消费

← 消息队列基础 重复消费→

Theme by Vdoing | Copyright © 2022-2025 zdk | notes
湘ICP备2022001117号-1
川公网安备 51142102511562号
本网站由 提供CDN加速/云存储服务
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式