风在路上 风在路上
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)

风

摸鱼
首页
导航站
  • Java-Se

    • Java基础
  • Java-Se进阶-多线程

    • 多线程
  • Java-Se进阶-java8新特性

    • java8新特性
  • Java-ee

    • JavaWeb
  • Java虚拟机

    • JVM
  • golang基础

    • golang基础
  • golang框架

    • gin
  • SQL 数据库

    • MySQL
  • NoSQL 数据库

    • Redis
    • ElasticSearch
    • MongoDB
  • ORM

    • MyBatis
    • MyBatis-Plus
  • Spring

    • Spring
  • SpringMVC

    • SpringMVC1
    • SpringMVC2
  • SpringCloud

    • SpringCloud
  • 中间件

    • RabbitMQ
    • Dubbo
  • 秒杀项目
  • Git
  • Linux
  • Docker
  • JWT
  • 面试
  • 刷题
开发问题😈
设计模式
关于💕
归档🕛
GitHub (opens new window)
  • mybatis

  • mybatis-plus

  • Spring

  • SpringMvc

  • RabbitMQ

    • RabbitMQ - 知识体系
    • 中间件介绍
    • 消息队列介绍
    • RabbitMQ - 安装
    • RabbitMQ - 简单案例
    • RabbitMQ - 发布确认
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
      • 死信的概念
      • 死信的来源
      • 死信实战
        • 死信之TTl
        • 死信之最大长度
        • 死信之消息被拒
    • RabbitMQ - 延迟队列
    • RabbitMQ - 发布确认高级-不可路由消息处理
    • RabbitMQ - 幂等性、优先级、惰性
    • 消息队列基础
    • 消息丢失
    • 重复消费
    • 顺序消费
  • Dubbo

  • SpringCloud

  • 框架
  • RabbitMQ
zdk
2021-06-28
目录

RabbitMQ - 死信队列

  • 死信的概念
  • 死信的来源
  • 死信实战
    • 死信之TTl
    • 死信之最大长度
    • 死信之消息被拒

# 死信的概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

# 死信的来源

  • 消息 TTL 过期

    TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度

    队列满了,无法再添加数据到 mq 中

  • 消息被拒绝

    (basic.reject 或 basic.nack) 并且 requeue=false.

# 死信实战

RabbitMQ-00000048

# 死信之TTl

生产者代码:

package com.zdk.deadQueue;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.zdk.utils.RabbitMQUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
 * @author zdk
 * @date 2022/5/7 20:27
 * 生产者
 */
public class Producer {

    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";
    public static final String NORMAL_ROUTING_KEY = "normal";
    public static final String DEAD_ROUTING_KEY = "dead";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //绑定死信交换机与死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);

        /*
        普通队列将消息在三种情况下转到死信队列需要进行以下参数配置
         */
        //为普通队列声明参数
        Map<String, Object> arguments = new HashMap<>();
        //指定它的死信队列
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //指定死信队列与死信交换机的routingKey
        arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);

        //声明普通队列
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //绑定普通交换机与普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);


        System.out.println("生产者准备发送消息......");
        Scanner cin = new Scanner(System.in);

        /*
         * 1.情况一:消息处理超时从而进入死信队列
         * 构建参数 消费者将消息过期时间设置为10秒
         * 如果消息超过10秒没被确认消费,那么就会从普通队列移除 并加入到死信队列中
         */
        AMQP.BasicProperties props =
                new AMQP.BasicProperties()
                .builder()
                .expiration("10000")
                .build();

        while (cin.hasNext()) {
            String message = cin.next();
            //发送消息
            channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, props, message.getBytes(StandardCharsets.UTF_8));
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

普通消费者代码

package com.zdk.deadQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zdk.utils.RabbitMQUtils;

/**
 * @author zdk
 * @date 2022/5/7 20:26
 * 普通队列消费者
 */
public class NormalQueueConsumer {

    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println("普通队列消费者消费了消息:"+msg);
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        System.out.println("普通队列消费者准备接收消息......");

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

启动生产者后,不启动普通队列消费者,然后发送消息,来模拟其接收不到消息导致消息处理超时的情况

这是初始情况:

image-20220507213514435

发送两条消息后:

image-20220507213547389

等待10s后:

image-20220507213602168

死信消费者代码:

以上步骤完成后,启动死信消费者,它消费死信队列里面的消息

package com.zdk.deadQueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.zdk.utils.RabbitMQUtils;

/**
 * @author zdk
 * @date 2022/5/7 20:27
 * 死信队列消费者
 */
public class DeadQueueConsumer {

    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println("死信队列消费者消费了消息:"+msg);
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        System.out.println("死信队列消费者准备接收消息......");

        channel.basicConsume(DEAD_QUEUE, false, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

image-20220507213747962

# 死信之最大长度

1、消息生产者代码去掉 TTL 属性

image-20220507214001002

2、C1 消费者修改以下代码**(启动之后关闭该消费者 模拟其接收不到消息)**

image-20220507214119234

//设置正常队列的长度限制,例如发5个,3个则为死信
arguments.put("x-max-length", 2);
1
2

注意此时需要把原先队列删除 因为参数改变了

3、死信队列消费者代码不变,启动它

image-20220507214508038

# 死信之消息被拒

1、生产者代码同上不用修改

2、普通队列消费者修改一下 拒收消息 "garbage"

public class NormalQueueConsumer {
    public static final String NORMAL_QUEUE = "normal_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody());
            if (msg.equals("garbage")){
                System.out.println("普通队列消费者拒收了消息:"+msg);
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            }else{
                System.out.println("普通队列消费者消费了消息:"+msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };

        System.out.println("普通队列消费者准备接收消息......");

        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

发送一个"正常",一个"garbage"后

image-20220507215654787

image-20220507215712321

然后启动死信队列消费者:

image-20220507215736452

在 GitHub 上编辑此页 (opens new window)
#RabbitMQ#消息队列
最后更新: 2022/09/16, 20:09:00
RabbitMQ - 交换机
RabbitMQ - 延迟队列

← RabbitMQ - 交换机 RabbitMQ - 延迟队列→

Theme by Vdoing | Copyright © 2022-2025 zdk | notes
湘ICP备2022001117号-1
川公网安备 51142102511562号
本网站由 提供CDN加速/云存储服务
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式