重复消费
Table of Contents generated with DocToc (opens new window)
# 消息重复的场景
# 发送消息时重复
提示
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对生产者的确认应答失败。此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
# 投递时消息重复
提示
消息消费的场景下,消息已投递到消费者并完成业务处理,当消费者给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
# 负载均衡时消息重复
提示
包括但不限于网络抖动、Broker 重启以及消费者应用重启。当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
# 解决方案
# 利用幂等性
提示
既然在生产和消费过程中都有可能出现重复消费问题,那我们从消费的末端去处理,把识别出重复的消息,然后抛弃此消息,那不就能避免重复消息对业务的影响,这也就是幂等处理。
所谓幂等性,就是数据无论操作多少次,所产生的影响跟执行一次是一样的,比如对于读操作来说,无论读取多少次数据,都跟读取一次的数据是一样的,所以读操作是一个幂等性操作,而添加操作,添加多次会有多条记录,因而写操作则是非幂等性操作。因而对于以上场景,只要保证消息消费的幂等性,就能解决重复消费的问题。
# 1. 数据库唯一索引
可以通过给消息的某一些属性设置唯一约束,比如增加唯一uuid,添加的时候查询是否存对应的uuid,存在不操作,不存在则添加,那样对于相同的uuid只会存在一条数据。其实,只要类似“insert if not exist”的操作都可能,但需要保证查询跟添加的操作必须是原子性操作。例如:上面取款发短信的场景则可以借助redis的setnx实现。
public class SendServiceImpl implements SendService {
@Autowired
private JedisClient jedisClient;
@Value("channel")
private String channel;
@Override
public boolean sendMessage(Message message) {
String uuid = message.getUuid();
// 判断是否已经发送了
boolean send = jedisClient.setnx(channel, uuid) == 1;
if(send){
// TODO 开始发送短信
}
return true;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 2. 设置前置条件
在更新的时候,可以通过设置一定的前置条件来保证数据幂等,比如给用户发送短信是非幂等操作,但可以添加前置条件,变成如果该用户未发送过短信,则给用户发送短信,此时的操作则是幂等性操作。但在实际上,对于一个问题如何获取前置条件往往比较复杂,此时可以通过设置版本号version,没修改一次则版本号+1,在更新时则通过判断两个数据的版本号是否一致。
UPDATE message SET m_status = #{status} WHERE uuid = #{uuid} AND version = #{version}
# 3. 通过全局ID实现
最后的方式就比较暴力也比较通用,通过设置全局Id去实现。实现的思路是,在发送消息时,给每条消息指定一个全局唯一的 ID(可以通过雪花算法去实现),消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据。
虽然看起来好像不复杂,单机环境实现也比较简单,就是查询更新的思路,但在分布式环境上一点也不简单,因为必须保证查询跟更新是原子性的操作,不能查询完又有另外一个事务去更新了数据。当然,对于这种问题也可以通过分布式事务和分布式锁去实现,但与之的也降低了系统的性能。
# 4. 使用业务唯一标识key
# 生产者设置唯一标识的key值
提示
既然后要能识别重复消息,那必须是此条消息有唯一的标识。到了这里,你肯定会想到用RocketMQ生成的MessageId不就可以了吗?但这不是最佳方法,因为 MessageId有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以MessageId作为处理依据。而最好的方式是以业务唯一标识作为幂等处理的关键依据(如订单ID)。
业务的唯一标识可以在生产者通过消息 Key 设置。实现代码如下:
Message message = new Message();
message.setKey("ORDERID_001");
SendResult sendResult = producer.send(message);
2
3
# 消费者识别key相同的重复消息
提示
消费者收到消息时可以根据消息的 Key判断是否重复来实现消息幂等。这里我们用到了redis存放消息key值(因为redis读取快),并且对于key值大存放时长可以设置,超过了时长就会被清除掉
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 Key 做幂等处理
}
});
2
3
4
5
6
这里可以先判断key是否在redis中存在,如果存在则抛弃不走下面的业务逻辑;如果,在redis中没有查到,则继续下面的业务逻辑处理
# 5. 使用分布式锁
# 生产者端传递业务id
@Test
public void testSendMessage(){
//业务id
String id = UUID.randomUUID().toString();
//封装了业务id的消息元数据
CorrelationData correlationData = new CorrelationData(id);
//发送消息,并且携带消息的业务id
rabbitTemplate.convertAndSend("my_boot_topic_exchange",
"product.add",
"hello message",
correlationData
);
}
2
3
4
5
6
7
8
9
10
11
12
13
# 消费者端进行业务逻辑判断
/**
* 消费端的幂等性的实现
*/
@RabbitListener(queues = "my_boot_topic_queue")
public void processByMSG(Message message,Channel channel) throws IOException {
//如何获得消息的业务id
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation"); //设置分布式锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(messageId, 1,100000, TimeUnit.MILLISECONDS);
if(lock){
//做消费
System.out.println("添加用户成功");
//手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else{
//不做消费
System.out.println("已重复消费");
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19