RabbitMQ官网教程RabbitMQ Tutorials中提供了六个入门案例,下面按照案例顺序逐个进行实践操作,了解RabbitMQ的基本应用
- “Hello World”:简单一对一队列
- “Work Queues”:一对多队列,模拟任务分配
- “Publish/Subscrible”:Fanout类型交换机,广播消息
- “Routing”:direct类型交换机,根据RoutingKey发送到对应队列
- “Topics”:topic类型交换机,根据RoutingKey的匹配规则发送到对应队列
- “RPC”:使用消息队列模拟RPC调用
- “Publish Confirms”:通过发送确认,实现发送端可靠性保证
为了基于Java操作RabbitMQ,需要加入对应的Client依赖
1 | <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> |
Hello World案例
该案例为基本的生产者消费者场景,一个生产者P向MQ中写入消息,一个消费者C从MQ中读取消息
生产者代码如下:
1 | public class Sender { |
消费者代码如下:
- 消费者同样需要声明队列,为了避免消费者运行先于生产者
1 | public class Receiver { |
运行后出现结果:
遇到的问题:新创建用户不具备visualhost”/“的权限,导致无法向队列写入消息
错误提示信息如下
在管理页面修改用户权限即可
Work Queues案例
此案例中消息的生产者相当于任务分配者,消费者相当于任务的执行者,分配者将任务发送到队列后不需要等待任务返回,执行者执行完毕后返回执行结果,相当于借助消息队列将同步操作变为异步操作,在处理耗费资源任务的场景中能够降低任务分配者阻塞的成本。
生产者发送20条消息到队列中,模拟发布20个任务:
1 | //向队列中不间断的发送20条消息 |
消费者采用多线程方式实现,两个线程消费消息并休眠,模拟执行任务
1 |
|
可以看到RabbitMQ采取Round robin
的方式,两个线程轮流消费消息
消息的ACK问题
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer.
为了保证消息正确的被消费,RabbitMQ只有在收到消费者的ACK确认后才会将消息从队列中删除,若由于特殊原因导致未收到ACK
- 消费者挂掉
- 超时未确认(默认30分钟)
RabbitMQ会将消息重新加入到队列头,交给另外的消费者进行消费
1 | //关闭自动确认 |
修改代码后,使用rabbitmqctl
查看未确认消息
公平分配的问题
Rabbit对于同一个队列的多个消费者消息分配采取round robin
的方式,虽然能够保证消息的均匀分配,但是如果消息对应任务的工作量不同,此时不同消费者的工作量就会不同,导致负载均衡失效。
官方教程中提供了一种方式,通过设置prefetchCount = 1
,只有在消费者ack历史消息后,才会将新的消息分配给他,此时消息分配机制类似于先到先得,对应代码如下:
prefetchCount
:指当消费者未确认的消息等于prefetchCount时,不再向消费者发送消息
1 | int prefetchCount = 1; |
修改代码,可以验证上述结果:
Publish/Subscribe案例
引入了exchange(交换机)的概念,消费者并不需要了解消息发送到哪个队列,只需要发送到指定的交换机,交换机根据其类型和队列路由key确定分发对象,exchange类型分为:
- 直连(direct):拿到消息后根据路由键是否等于对应队列的绑定键,直接将消息路由到对应队列。
- 扇形(Fanout):将消息发送到当前交换机绑定的所有队列(广播路由)。
- 主题(topic):通过将消息路由键与队列绑定模式匹配,将消息转发到一个或者多个匹配成功的队列(多播路由)。
- 头(head):将路由键从字符串变为了多值属性,多值匹配实现路由。
引入交换机概念后的消费者代码的逻辑为:
若不存在,则声明交换机
1
channel.exchangeDeclare("exchange_name", "exchange_type");
向交换机上生产消息
1
channel.basicPublish("exchange_name","route_key", 基本属性, message);
队列创建的逻辑:
声明队列
1
channel.queueDeclare(queue, durable, exclusive, autoDelete,arguments);
绑定交换机
1
channel.queueBind(queue, exchange, routingKe);
以fanout
即广播类型exchange为例,展示发布订阅案例
发布代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public class EmitLog {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare("log_exchange", "fanout");
Scanner scanner = new Scanner(System.in);
String curString = scanner.nextLine();
while(!curString.startsWith("end")){
//写入日志
channel.basicPublish("log_exchange","", null, curString.getBytes(StandardCharsets.UTF_8));
curString = scanner.nextLine();
}
System.out.println("exit");
}
}订阅代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class ReceiveLog {
private static class Receiver extends Thread{
int receiverNum;
public Receiver(int receiverNum){
this.receiverNum = receiverNum;
}
public void run(){
try {
Channel channel = RabbitMQUtils.getChannel();
//生成一个临时队列
String queName = channel.queueDeclare().getQueue();
channel.queueBind(queName, "log_exchange", "");
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.printf("receiver:%d receive log:%s\n",receiverNum, new String(message.getBody()));
}
};
channel.basicConsume(queName, deliverCallback, consumerTag -> {});
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
for(int i = 1;i < 3;i++){
new Receiver(i).start();
}
}
}结果为每发布一条消息,两个接收者均能够消费到消息
Routing
前面的代码其实已经覆盖了该部分示例,在创建que和exchange的绑定关系时指定routing key,当生产者发布消息时exchange(direct 类型)根据routing key发布到对应的队列中:
一个队列可以绑定多个routing key
多个队列可以绑定相同的routing key
综合示例
Topic
Topic类型交换机基于RoutingKey的匹配实现路由,其中routingkey闭幕满足word.word.word
的格式,每个词语之间通过.
分隔,当对应位置词语匹配时,将消息路由到对应队列,包括两个特殊类型的字符:
*
:匹配一个词语#
:匹配多个词语
下面举几个匹配例子:
*.*.rabbit
:可以匹配任意第三个词为rabbit
的路由键,例如quick.orange.rabbit
lazy.#
:可以匹配任意第一个词为lazy
的路由键,例如lazy.orange.rabbit
,lazy
,lazy.nb
了解之前以为按照正则表达式的方式匹配,实际上只是简单的点分字符匹配。
RPC
基于RabbitMQ的队列操作可以模拟实现RPC接口,基本实现逻辑如下图所示:
- RPC客户端将RPC请求参数写入队列,并监听返回队列
- RPC服务端将监听请求参数队列,将结果写入返回队列
要实现RPC需要解决以下问题:
RPC Server如何指导写入哪个返回队列?
固定的返回队列
在参数消息中携带返回队列名(replyTo属性)
1
replyTo = returnQue
返回队列是临时的还是固定的
固定的方便传输,不用重复创建->区分不同请求的响应结果->使用correlationId属性解决
1
correlationId = UUID.randomUUID()
每次创建一个临时队列
得到如下的案例代码
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public static byte[] rpcCall(String callQue, String returnQue, byte[] parameter) throws IOException, ExecutionException, InterruptedException {
if(returnQue == null){
returnQue = c.queueDeclare().getQueue();
}
String opID = String.valueOf(UUID.randomUUID());
//构建属性
BasicProperties properties = new BasicProperties.Builder().correlationId(opID).replyTo(returnQue)
.build();
c.queueDeclare(callQue, false, false, false, null);
//写入参数到请求队列
c.basicPublish("", callQue, properties, parameter);
final CompletableFuture<byte[]> response = new CompletableFuture<>();
DeliverCallback deliverCallback = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
if(opID.equals(message.getProperties().getCorrelationId())){
response.complete(message.getBody());
}
}
};
//监听返回队列
String ct = c.basicConsume(returnQue, true, deliverCallback, consumerTag -> {});
byte[] result = response.get();
c.basicCancel(ct);
return result;
}服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void run(){
try {
Channel c = RabbitMQUtils.getChannel();
//能者多劳
c.basicQos(1);
DeliverCallback d = new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
//返回结果(不能忘记添加操作编号)
c.basicPublish("", message.getProperties().getReplyTo(), new AMQP.BasicProperties.Builder().correlationId(message.getProperties().getCorrelationId()).build(),
String.valueOf(fbi(Integer.parseInt(new String(message.getBody())))).getBytes(StandardCharsets.UTF_8));
c.basicAck(message.getEnvelope().getDeliveryTag(), false);
System.out.printf("server:%d finish task %s\n",serverNum, new String(message.getBody()));
}
};
//监听请求队列
c.basicConsume(RPC_QUE, false, d, consumerTag -> {});
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
}
}
问题:公用一个队列,如果一个客户端消费了其他的返回消息,如何解决这个问题?
Publish Confirms
发布确认为了保证生产者发送的消息到达了broker,没有因为网络或者拥塞等原因丢弃,通过发布确认、队列持久化、消息持久化三个设置能够实现消息的可靠发布。
- 发布确认只是用来保证消息到达交换机,至于是否发送到队列,消息是否被正常消费并不在确认范围。
- 在发送消息时设置
mandatory=true
,当交换机找不到合适的队列投递消息,会将消息返回给生产者。
在发送消息前,通过confirmSelect
开启信道上的发布确认功能:
1 | Channel channel = connection.createChannel(); |
根据生产者确认是否同步,分为三种发布确认方式:
- 同步确认
- 批量同步确认
- 异步确认
这里的同步指的是生产者在等待确认时是否阻塞,并不是I/O中所谓的同步,更偏向于“阻塞”的概念
同步确认
生产者在发送消息后,等待消息确认后再进行下一步操作,代码如下:
1 | while(!inputStr.equals("end")){ |
其中相关的确认函数包括
OrDie
:如果任意一条发送的消息返回为nack
,直接抛出IOException
1 | boolean waitForConfirms() throws InterruptedException; |
批量确认
生产者发送一定量消息后,再进行整体确认,代码如下:
- 确认函数包括前面所有消息,当一条消息返回为
nack
,直接抛出IOException
- 问题是:无法知道哪条消息发送失败,一旦失败需要重发批中所有消息
1 | int bachCount = 10; |
异步确认
基于回调函数进行消息确认,代码如下:
- 基于消息的
sequence number
对每一条发布消息进行确认和处理 - 使用
NavigableMap
存储未确认的<sequence number, message>
,采用排序map的原因是为了解决累计确认(类似于TCP协议)的问题
1 | ConcurrentNavigableMap<Long, byte[]> confirmMap = new ConcurrentSkipListMap<>(); |
运行结果:
- 可见存在累计确认的情况
- 序列号顺序递增,可以使用循环缓冲区实现缓冲确认功能。
SpringBoot中使用发布确认
需要自定义回调函数ConfirmCallback
,并注入到RedisTemplate的对应接口方法中
1 |
|
同时在配置文件中开启发布确认:none
,simple
,correlated
(后两者区别暂时不知道)
1 | correlated = |
另外还可通过ReturnsCallback
回调函数在消息投递失败后,由Exchange将消息返回给生产者,需要在配置文件开启
1 | true = |
Consumer Confirms
RabbitMQ在消费端同样包括消息确认机制,分为三种确认模式:
basic.ack
:确认消息正常消费。basic.nack
:消息不正常消费,可指定消息重新入队/丢弃/进入死信队列。(negatively acknowledged)basic.reject
:nack的多条版本,拒绝小于等于指定delivertag
的所有消息。