风在路上 风在路上
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)

风

摸鱼
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringMvc

  • RabbitMQ

    • RabbitMQ - 知识体系
    • 中间件介绍
    • 消息队列介绍
    • RabbitMQ - 安装
    • RabbitMQ - 简单案例
    • RabbitMQ - 发布确认
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
    • RabbitMQ - 发布确认高级-不可路由消息处理
    • RabbitMQ - 幂等性、优先级、惰性
    • 消息队列基础
    • 消息丢失
    • 重复消费
      • 消息重复的场景
        • 发送消息时重复
        • 投递时消息重复
        • 负载均衡时消息重复
      • 解决方案
        • 利用幂等性
        • 1. 数据库唯一索引
        • 2. 设置前置条件
        • 3. 通过全局ID实现
        • 4. 使用业务唯一标识key
        • 生产者设置唯一标识的key值
        • 消费者识别key相同的重复消息
        • 5. 使用分布式锁
        • 生产者端传递业务id
        • 消费者端进行业务逻辑判断
        • 为什么redis-cluster在极端情况下不适合做分布式锁
    • 顺序消费
  • Dubbo

  • SpringCloud

  • 框架
  • RabbitMQ
zdk
2022-09-16
目录

重复消费

Table of Contents generated with DocToc (opens new window)

  • 消息重复的场景
    • 发送消息时重复
    • 投递时消息重复
    • 负载均衡时消息重复
  • 解决方案
    • 利用幂等性
      • 1. 数据库唯一索引
      • 2. 设置前置条件
      • 3. 通过全局ID实现
      • 4. 使用业务唯一标识key
        • 生产者设置唯一标识的key值
        • 消费者识别key相同的重复消息
      • 5. 使用分布式锁
        • 生产者端传递业务id
        • 消费者端进行业务逻辑判断
        • 为什么redis-cluster在极端情况下不适合做分布式锁

# 消息重复的场景

# 发送消息时重复

提示

当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对生产者的确认应答失败。此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

# 投递时消息重复

提示

消息消费的场景下,消息已投递到消费者并完成业务处理,当消费者给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

# 负载均衡时消息重复

提示

包括但不限于网络抖动、Broker 重启以及消费者应用重启。当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

# 解决方案

# 利用幂等性

提示

既然在生产和消费过程中都有可能出现重复消费问题,那我们从消费的末端去处理,把识别出重复的消息,然后抛弃此消息,那不就能避免重复消息对业务的影响,这也就是幂等处理。


所谓幂等性,就是数据无论操作多少次,所产生的影响跟执行一次是一样的,比如对于读操作来说,无论读取多少次数据,都跟读取一次的数据是一样的,所以读操作是一个幂等性操作,而添加操作,添加多次会有多条记录,因而写操作则是非幂等性操作。因而对于以上场景,只要保证消息消费的幂等性,就能解决重复消费的问题。

# 1. 数据库唯一索引

可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据。其实,只要类似“insert if not exist”的操作都可能,但需要保证查询跟添加的操作必须是原子性操作。例如:上面取款发短信的场景则可以借助redis的setnx实现。

public class SendServiceImpl implements SendService {

    @Autowired
    private JedisClient jedisClient;
    @Value("channel")
    private String channel;

    @Override
    public boolean sendMessage(Message message) {
        String uuid = message.getUuid();
        // 判断是否已经发送了
        boolean send = jedisClient.setnx(channel, uuid) == 1;
        if(send){
            // TODO 开始发送短信
        }
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 2. 设置前置条件

在更新的时候,可以通过设置一定的前置条件来保证数据幂等,比如给用户发送短信是非幂等操作,但可以添加前置条件,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,没修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。

UPDATE message SET m_status = #{status} WHERE uuid = #{uuid} AND version = #{version}
1

# 3. 通过全局ID实现

最后的方式就比较暴力也比较通用,通过设置全局Id去实现。实现的思路是,在发送消息时,给每条消息指定一个全局唯一的 ID(可以通过雪花算法去实现),消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据。


虽然看起来好像不复杂,单机环境实现也比较简单,就是查询更新的思路,但在分布式环境上一点也不简单,因为必须保证查询跟更新是原子性的操作,不能查询完又有另外一个事务去更新了数据。当然,对于这种问题也可以通过分布式事务和分布式锁去实现,但与之的也降低了系统的性能。

# 4. 使用业务唯一标识key

# 生产者设置唯一标识的key值

提示

既然后要能识别重复消息,那必须是此条消息有唯一的标识。到了这里,你肯定会想到用RocketMQ生成的MessageId不就可以了吗?但这不是最佳方法,因为 MessageId有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以MessageId作为处理依据。而最好的方式是以业务唯一标识作为幂等处理的关键依据(如订单ID)。


业务的唯一标识可以在生产者通过消息 Key 设置。实现代码如下:

Message message = new Message();
message.setKey("ORDERID_001");
SendResult sendResult = producer.send(message);
1
2
3
# 消费者识别key相同的重复消息

提示

消费者收到消息时可以根据消息的 Key判断是否重复来实现消息幂等。这里我们用到了redis存放消息key值(因为redis读取快),并且对于key值大存放时长可以设置,超过了时长就会被清除掉

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // 根据业务唯一标识的 Key 做幂等处理
    }
});
1
2
3
4
5
6

这里可以先判断key是否在redis中存在,如果存在则抛弃不走下面的业务逻辑;如果,在redis中没有查到,则继续下面的业务逻辑处理

# 5. 使用分布式锁

2331630-20210722075625778-244580759

# 生产者端传递业务id
@Test
public void testSendMessage(){
    //业务id
    String id = UUID.randomUUID().toString();
    //封装了业务id的消息元数据
    CorrelationData correlationData = new CorrelationData(id);
    //发送消息,并且携带消息的业务id
    rabbitTemplate.convertAndSend("my_boot_topic_exchange",
            "product.add",
            "hello message",
            correlationData
            );
}
1
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者端进行业务逻辑判断
/**
 * 消费端的幂等性的实现
 */
@RabbitListener(queues = "my_boot_topic_queue")
public void processByMSG(Message message,Channel channel) throws IOException {
    //如何获得消息的业务id
   String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");	//设置分布式锁
    Boolean lock = redisTemplate.opsForValue().setIfAbsent(messageId, 1,100000, TimeUnit.MILLISECONDS);
    if(lock){
        //做消费
        System.out.println("添加用户成功");
        //手动ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }else{
        //不做消费
        System.out.println("已重复消费");
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 为什么redis-cluster在极端情况下不适合做分布式锁

在 GitHub 上编辑此页 (opens new window)
#消息队列
最后更新: 2022/10/04, 16:10:00
消息丢失
顺序消费

← 消息丢失 顺序消费→

Theme by Vdoing | Copyright © 2022-2025 zdk | notes
湘ICP备2022001117号-1
川公网安备 51142102511562号
本网站由 提供CDN加速/云存储服务
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式