0%

RabbitMQ死信队列

对于队列中存在的部分无法消费的消息(dead-lettered),RabbitMQ提供”死信队列”机制,实现死信的收集处理。死信队列还是普通的队列,只是其于某个队列绑定,当该队列出现私信时会将死信转发到该队列中,提供给该队列消费者处理。

当出现以下三种情况时,消息会变为死信:

  1. 消息超时未消费(TTL)。
  2. 消息被消费者拒绝(nackreject),且requeue参数为false,即不重新入队。
  3. 消息队列中消息数达到上限,将新消息丢弃。

设置一个死信队列的基本流程为:

  1. 声明一个交换机和队列,作为死信交换机和队列

  2. 声明普通交换机和对应的队列,并绑定对应的死信交换机以及路由key(设置队列的dead-letter-exchange属性和x-dead-letter-routing-key属性)

    1
    2
    3
    4
    5
    6
    channel.exchangeDeclare("deadLetterex", "direct");
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", "deadLetterex");
    args.put("x-dead-letter-routing-key", "deadLetterkey");
    //设置队列属性
    channel.queueDeclare("myqueue", false, false, false, args);

与其说死信队列,不如说是队列的一种特殊机制,将”死信”转发到实现设置好的交换机中。

Java实现测试

  1. 初始化死信队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public static void init() throws IOException, TimeoutException {
    Channel c = RabbitMQUtils.getChannel();
    //首先声明死信交换机和队列
    c.exchangeDeclare(DLX, BuiltinExchangeType.DIRECT);
    c.queueDeclare(DLX_QUE, false, false, false, null);c.queueBind(DLX_QUE, DLX, DLX_KEY);
    //声明普通交换机和队列
    c.exchangeDeclare(NORMAL_X, BuiltinExchangeType.DIRECT);
    //设置dead-letter-exchange属性
    Map<String, Object> properties = new HashMap<>();
    properties.put("x-dead-letter-exchange", DLX);
    properties.put("x-dead-letter-routing-key", DLX_KEY);
    //声明普通队列
    c.queueDeclare(NORMAL_QUE, false, false, false, properties);
    c.queueBind(NORMAL_QUE, NORMAL_X, NORMAL_KEY);
    }
  2. 生产者和消费者代码较为简单,只展示消费者对于消息的处理逻辑

    • 为了展示死信队列的效果,普通队列消费者会根据消息内容,进行拒绝和休眠

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      DeliverCallback deliverCallback = new DeliverCallback() {
      @Override
      public void handle(String consumerTag, Delivery message) throws IOException {
      String[] allMsg = new String(message.getBody()).split(",");
      long tag = message.getEnvelope().getDeliveryTag();
      if(Integer.parseInt(allMsg[0]) == 1){
      c.basicReject(tag, false);
      }else{
      try {
      Thread.sleep(Long.parseLong(allMsg[1]) * 1000);
      System.out.println("[normal] consume:" + tag
      + "after " + Long.parseLong(allMsg[1]) + "s get:" + allMsg[2]);
      c.basicAck(tag, false);
      } catch (InterruptedException e) {
      throw new RuntimeException(e);
      }
      }
      }
      };
    • 死信队列消费者输出消息头中的death_reason属性

展示效果:

  1. 由于消息被拒绝,导致进入死信队列

    • 输出原因为rejected
  2. 超时导致进入死信队列(ttl设置为1s)

    • 关闭消费者,输出原因为expired

    • 诡异的是如果一条消息被消费者接收了但未ack,队列中的消息并不会超时(第一条消息让消费者休眠20秒,后两条消息未超时)

    • 查看对应的消息状态为Unacked

    • 添加c.basicQos(1)代码,避免提前投递

      • 此时消息状态为ready

      • 正常超时

基于插件的延时队列实现

由于基于死信队列实现延时队列的队尾处理与超时处理的逻辑不一致,因此存在另一种通过插件实现延时队列的方式,通过安装rabbitmq_delayed_message_exchange插件,在Rabbitmq中增加一种支持消息延迟的交换机,在消息超时后才将消息投递到对应的队列中。

下载对应版本插件到rabbitmq的plugins目录中,安装插件并重启

通过web页面可以看到新增加了x-delayed-message类型的交换机,需要设置消息投递方式

结论以及补充

  1. TTL控制的是指消息Ready状态的时间,一旦被投送到消费者,转换为Unacked状态

  2. 由于队列排队消费的性质,队列中的消息即使超时,也要等到其到达队尾,才会进行处理(丢弃,进入死信队列等)

  3. 默认同时向消费者投送不限量的消息,即使最新的消息未ack

  4. 可以对队列设置超时时间,也可以为每条消息单独设置超时时间,两者较小的作为最终的超时时间

    When both a per-queue and a per-message TTL are specified, the lower value between the two will be chosen.