对于队列中存在的部分无法消费的消息(dead-lettered),RabbitMQ提供”死信队列”机制,实现死信的收集处理。死信队列还是普通的队列,只是其于某个队列绑定,当该队列出现私信时会将死信转发到该队列中,提供给该队列消费者处理。
当出现以下三种情况时,消息会变为死信:
- 消息超时未消费(TTL)。
- 消息被消费者拒绝(
nack
、reject
),且requeue
参数为false,即不重新入队。 - 消息队列中消息数达到上限,将新消息丢弃。
设置一个死信队列的基本流程为:
声明一个交换机和队列,作为死信交换机和队列
声明普通交换机和对应的队列,并绑定对应的死信交换机以及路由key(设置队列的
dead-letter-exchange
属性和x-dead-letter-routing-key
属性)1
2
3
4
5
6channel.exchangeDeclare("deadLetterex", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "deadLetterex");
args.put("x-dead-letter-routing-key", "deadLetterkey");
//设置队列属性
channel.queueDeclare("myqueue", false, false, false, args);
与其说死信队列,不如说是队列的一种特殊机制,将”死信”转发到实现设置好的交换机中。
Java实现测试
初始化死信队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public static void init() throws IOException, TimeoutException {
Channel c = RabbitMQUtils.getChannel();
//首先声明死信交换机和队列
c.exchangeDeclare(DLX, BuiltinExchangeType.DIRECT);
c.queueDeclare(DLX_QUE, false, false, false, null);c.queueBind(DLX_QUE, DLX, DLX_KEY);
//声明普通交换机和队列
c.exchangeDeclare(NORMAL_X, BuiltinExchangeType.DIRECT);
//设置dead-letter-exchange属性
Map<String, Object> properties = new HashMap<>();
properties.put("x-dead-letter-exchange", DLX);
properties.put("x-dead-letter-routing-key", DLX_KEY);
//声明普通队列
c.queueDeclare(NORMAL_QUE, false, false, false, properties);
c.queueBind(NORMAL_QUE, NORMAL_X, NORMAL_KEY);
}生产者和消费者代码较为简单,只展示消费者对于消息的处理逻辑
为了展示死信队列的效果,普通队列消费者会根据消息内容,进行拒绝和休眠
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
String[] allMsg = new String(message.getBody()).split(",");
long tag = message.getEnvelope().getDeliveryTag();
if(Integer.parseInt(allMsg[0]) == 1){
c.basicReject(tag, false);
}else{
try {
Thread.sleep(Long.parseLong(allMsg[1]) * 1000);
System.out.println("[normal] consume:" + tag
+ "after " + Long.parseLong(allMsg[1]) + "s get:" + allMsg[2]);
c.basicAck(tag, false);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
};死信队列消费者输出消息头中的
death_reason
属性
展示效果:
由于消息被拒绝,导致进入死信队列
- 输出原因为
rejected
- 输出原因为
超时导致进入死信队列(ttl设置为1s)
关闭消费者,输出原因为
expired
诡异的是如果一条消息被消费者接收了但未
ack
,队列中的消息并不会超时(第一条消息让消费者休眠20秒,后两条消息未超时)查看对应的消息状态为
Unacked
添加
c.basicQos(1)
代码,避免提前投递此时消息状态为
ready
正常超时
基于插件的延时队列实现
由于基于死信队列实现延时队列的队尾处理与超时处理的逻辑不一致,因此存在另一种通过插件实现延时队列的方式,通过安装rabbitmq_delayed_message_exchange
插件,在Rabbitmq中增加一种支持消息延迟的交换机,在消息超时后才将消息投递到对应的队列中。
下载对应版本插件到rabbitmq的plugins
目录中,安装插件并重启
通过web页面可以看到新增加了x-delayed-message
类型的交换机,需要设置消息投递方式
结论以及补充
TTL控制的是指消息
Ready
状态的时间,一旦被投送到消费者,转换为Unacked
状态由于队列排队消费的性质,队列中的消息即使超时,也要等到其到达队尾,才会进行处理(丢弃,进入死信队列等)
默认同时向消费者投送不限量的消息,即使最新的消息未
ack
可以对队列设置超时时间,也可以为每条消息单独设置超时时间,两者较小的作为最终的超时时间
When both a per-queue and a per-message TTL are specified, the lower value between the two will be chosen.