0%

RabbitMQ入门案例

RabbitMQ官网教程RabbitMQ Tutorials中提供了六个入门案例,下面按照案例顺序逐个进行实践操作,了解RabbitMQ的基本应用

  1. “Hello World”:简单一对一队列
  2. “Work Queues”:一对多队列,模拟任务分配
  3. “Publish/Subscrible”:Fanout类型交换机,广播消息
  4. “Routing”:direct类型交换机,根据RoutingKey发送到对应队列
  5. “Topics”:topic类型交换机,根据RoutingKey的匹配规则发送到对应队列
  6. “RPC”:使用消息队列模拟RPC调用
  7. “Publish Confirms”:通过发送确认,实现发送端可靠性保证

为了基于Java操作RabbitMQ,需要加入对应的Client依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>

Hello World案例

该案例为基本的生产者消费者场景,一个生产者P向MQ中写入消息,一个消费者C从MQ中读取消息

(P) -> [|||] -> (C)

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Sender {
private final static String QUEUE_NAME = "hello";
private final static String MQ_HOST = "192.168.190.100";

public static void main(String[] args)throws Exception {
//初始化连接并创建队列
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(MQ_HOST);
factory.setUsername("test_user");
factory.setPassword("123");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel()){
//创建队列
channel.queueDeclare(QUEUE_NAME,false, false, false, null);
//向默认交换器中发送消息,其中路由键等于队列名(因为默认交换器为直连型交换器)
channel.basicPublish("",QUEUE_NAME, null, "hello world".getBytes(StandardCharsets.UTF_8));
System.out.println("sender send message to " + QUEUE_NAME);
}
}
}

消费者代码如下:

  • 消费者同样需要声明队列,为了避免消费者运行先于生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Receiver {
private final static String QUEUE_NAME = "hello";
private final static String MQ_HOST = "192.168.190.100";

public static void main(String[] args)throws Exception {
//初始化代码和生产者相同
//省略。。。。。。
//等待消费到队列内容
System.out.println("Receiver is waiting for message");
//定义回调函数
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(new String(message.getBody()));
}
};
//开始消费(队列名,自动ACK,接收消息回调,拒绝消息回调函数)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

运行后出现结果:

遇到的问题:新创建用户不具备visualhost”/“的权限,导致无法向队列写入消息

  • 错误提示信息如下

  • 在管理页面修改用户权限即可

Work Queues案例

此案例中消息的生产者相当于任务分配者,消费者相当于任务的执行者,分配者将任务发送到队列后不需要等待任务返回,执行者执行完毕后返回执行结果,相当于借助消息队列将同步操作变为异步操作,在处理耗费资源任务的场景中能够降低任务分配者阻塞的成本。

Producer -> Queue -> Consuming: Work Queue used to distribute time-consuming tasks among multiple workers.

生产者发送20条消息到队列中,模拟发布20个任务:

1
2
3
4
5
6
7
//向队列中不间断的发送20条消息
for(int i = 1;i <= 20;i ++ ){
int workTime = new Random().nextInt(4) + 1;
channel.basicPublish("",QUEUE_NAME, null, String.format("%d %d", i, workTime).getBytes(StandardCharsets.UTF_8));
System.out.println("sender send message:"+ i+ " to" + QUEUE_NAME);
Thread.sleep(200);
}

消费者采用多线程方式实现,两个线程消费消息并休眠,模拟执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void run() {
System.out.println("worker:" + workerNum + " ready to work");
try {
//创建channel,并初始化队列
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false, false, false, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
String[] work = new String(message.getBody()).split(" ");
System.out.println(String.format("[receive]worker:%d, get work %s", workerNum, work[0],work[1]));
try {
Thread.sleep(Integer.parseInt(work[1]) * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
} catch (IOException e) {
throw new RuntimeException(e);
}
}

可以看到RabbitMQ采取Round robin的方式,两个线程轮流消费消息

消息的ACK问题

If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer.

为了保证消息正确的被消费,RabbitMQ只有在收到消费者的ACK确认后才会将消息从队列中删除,若由于特殊原因导致未收到ACK

  1. 消费者挂掉
  2. 超时未确认(默认30分钟)

RabbitMQ会将消息重新加入到队列头,交给另外的消费者进行消费

1
2
3
4
//关闭自动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
//在回调函数中开启手动确认
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

修改代码后,使用rabbitmqctl查看未确认消息

公平分配的问题

Rabbit对于同一个队列的多个消费者消息分配采取round robin的方式,虽然能够保证消息的均匀分配,但是如果消息对应任务的工作量不同,此时不同消费者的工作量就会不同,导致负载均衡失效。

官方教程中提供了一种方式,通过设置prefetchCount = 1,只有在消费者ack历史消息后,才会将新的消息分配给他,此时消息分配机制类似于先到先得,对应代码如下:

  • prefetchCount:指当消费者未确认的消息等于prefetchCount时,不再向消费者发送消息
1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

修改代码,可以验证上述结果:

Publish/Subscribe案例

引入了exchange(交换机)的概念,消费者并不需要了解消息发送到哪个队列,只需要发送到指定的交换机,交换机根据其类型和队列路由key确定分发对象,exchange类型分为:

  1. 直连(direct):拿到消息后根据路由键是否等于对应队列的绑定键,直接将消息路由到对应队列。
  2. 扇形(Fanout):将消息发送到当前交换机绑定的所有队列(广播路由)。
  3. 主题(topic):通过将消息路由键与队列绑定模式匹配,将消息转发到一个或者多个匹配成功的队列(多播路由)。
  4. 头(head):将路由键从字符串变为了多值属性,多值匹配实现路由。

引入交换机概念后的消费者代码的逻辑为:

  1. 若不存在,则声明交换机

    1
    channel.exchangeDeclare("exchange_name", "exchange_type");
  2. 向交换机上生产消息

    1
    channel.basicPublish("exchange_name","route_key", 基本属性, message);

队列创建的逻辑:

  1. 声明队列

    1
    channel.queueDeclare(queue, durable, exclusive, autoDelete,arguments);
  2. 绑定交换机

    1
    channel.queueBind(queue, exchange, routingKe);

fanout即广播类型exchange为例,展示发布订阅案例

  1. 发布代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class EmitLog {
    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = RabbitMQUtils.getChannel();
    channel.exchangeDeclare("log_exchange", "fanout");

    Scanner scanner = new Scanner(System.in);
    String curString = scanner.nextLine();
    while(!curString.startsWith("end")){
    //写入日志
    channel.basicPublish("log_exchange","", null, curString.getBytes(StandardCharsets.UTF_8));
    curString = scanner.nextLine();
    }
    System.out.println("exit");
    }
    }
  2. 订阅代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public class ReceiveLog {
    private static class Receiver extends Thread{
    int receiverNum;
    public Receiver(int receiverNum){
    this.receiverNum = receiverNum;
    }
    @Override
    public void run(){
    try {
    Channel channel = RabbitMQUtils.getChannel();

    //生成一个临时队列
    String queName = channel.queueDeclare().getQueue();

    channel.queueBind(queName, "log_exchange", "");

    DeliverCallback deliverCallback = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
    System.out.printf("receiver:%d receive log:%s\n",receiverNum, new String(message.getBody()));
    }
    };
    channel.basicConsume(queName, deliverCallback, consumerTag -> {});

    } catch (IOException | TimeoutException e) {
    throw new RuntimeException(e);
    }

    }
    }
    public static void main(String[] args) {
    for(int i = 1;i < 3;i++){
    new Receiver(i).start();
    }
    }
    }
  3. 结果为每发布一条消息,两个接收者均能够消费到消息

Routing

前面的代码其实已经覆盖了该部分示例,在创建que和exchange的绑定关系时指定routing key,当生产者发布消息时exchange(direct 类型)根据routing key发布到对应的队列中:

  1. 一个队列可以绑定多个routing key

    Direct exchange routing

  2. 多个队列可以绑定相同的routing key

    Multiple Bindings

  3. 综合示例

    Final routing: putting it all together.

Topic

Topic类型交换机基于RoutingKey的匹配实现路由,其中routingkey闭幕满足word.word.word的格式,每个词语之间通过.分隔,当对应位置词语匹配时,将消息路由到对应队列,包括两个特殊类型的字符:

  1. *:匹配一个词语
  2. #:匹配多个词语

Topic Exchange illustration, which is all explained in the following text.

下面举几个匹配例子:

  1. *.*.rabbit:可以匹配任意第三个词为rabbit的路由键,例如quick.orange.rabbit

  2. lazy.#:可以匹配任意第一个词为lazy的路由键,例如

    lazy.orange.rabbit,lazy,lazy.nb

了解之前以为按照正则表达式的方式匹配,实际上只是简单的点分字符匹配。

RPC

基于RabbitMQ的队列操作可以模拟实现RPC接口,基本实现逻辑如下图所示:

  1. RPC客户端将RPC请求参数写入队列,并监听返回队列
  2. RPC服务端将监听请求参数队列,将结果写入返回队列

Summary illustration, which is described in the following bullet points.

要实现RPC需要解决以下问题:

  1. RPC Server如何指导写入哪个返回队列?

    • 固定的返回队列

    • 在参数消息中携带返回队列名(replyTo属性)

      1
      replyTo = returnQue
  2. 返回队列是临时的还是固定的

    • 固定的方便传输,不用重复创建->区分不同请求的响应结果->使用correlationId属性解决

      1
      correlationId = UUID.randomUUID()
    • 每次创建一个临时队列

得到如下的案例代码

  1. 客户端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public static byte[] rpcCall(String callQue, String returnQue, byte[] parameter) throws IOException, ExecutionException, InterruptedException {
    if(returnQue == null){
    returnQue = c.queueDeclare().getQueue();
    }
    String opID = String.valueOf(UUID.randomUUID());
    //构建属性
    BasicProperties properties = new BasicProperties.Builder().correlationId(opID).replyTo(returnQue)
    .build();
    c.queueDeclare(callQue, false, false, false, null);
    //写入参数到请求队列
    c.basicPublish("", callQue, properties, parameter);
    final CompletableFuture<byte[]> response = new CompletableFuture<>();
    DeliverCallback deliverCallback = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
    if(opID.equals(message.getProperties().getCorrelationId())){
    response.complete(message.getBody());
    }
    }
    };
    //监听返回队列
    String ct = c.basicConsume(returnQue, true, deliverCallback, consumerTag -> {});
    byte[] result = response.get();
    c.basicCancel(ct);
    return result;
    }
  2. 服务端

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Override
    public void run(){
    try {
    Channel c = RabbitMQUtils.getChannel();
    //能者多劳
    c.basicQos(1);
    DeliverCallback d = new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery message) throws IOException {
    //返回结果(不能忘记添加操作编号)
    c.basicPublish("", message.getProperties().getReplyTo(), new AMQP.BasicProperties.Builder().correlationId(message.getProperties().getCorrelationId()).build(),
    String.valueOf(fbi(Integer.parseInt(new String(message.getBody())))).getBytes(StandardCharsets.UTF_8));
    c.basicAck(message.getEnvelope().getDeliveryTag(), false);
    System.out.printf("server:%d finish task %s\n",serverNum, new String(message.getBody()));
    }
    };
    //监听请求队列
    c.basicConsume(RPC_QUE, false, d, consumerTag -> {});
    } catch (IOException | TimeoutException e) {
    throw new RuntimeException(e);
    }
    }

问题:公用一个队列,如果一个客户端消费了其他的返回消息,如何解决这个问题?

Publish Confirms

发布确认为了保证生产者发送的消息到达了broker,没有因为网络或者拥塞等原因丢弃,通过发布确认、队列持久化、消息持久化三个设置能够实现消息的可靠发布。

  • 发布确认只是用来保证消息到达交换机,至于是否发送到队列,消息是否被正常消费并不在确认范围。
  • 在发送消息时设置mandatory=true,当交换机找不到合适的队列投递消息,会将消息返回给生产者。

在发送消息前,通过confirmSelect开启信道上的发布确认功能:

1
2
Channel channel = connection.createChannel();
channel.confirmSelect();

根据生产者确认是否同步,分为三种发布确认方式:

  1. 同步确认
  2. 批量同步确认
  3. 异步确认

这里的同步指的是生产者在等待确认时是否阻塞,并不是I/O中所谓的同步,更偏向于“阻塞”的概念

同步确认

生产者在发送消息后,等待消息确认后再进行下一步操作,代码如下:

1
2
3
4
while(!inputStr.equals("end")){
c.basicPublish("", "pc_test", null, inputStr.getBytes(StandardCharsets.UTF_8));
c.waitForConfirmsOrDie(5000);
}

其中相关的确认函数包括

  • OrDie:如果任意一条发送的消息返回为nack,直接抛出IOException
1
2
3
4
5
6
7
8
boolean waitForConfirms() throws InterruptedException;

boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

void waitForConfirmsOrDie() throws IOException, InterruptedException;

void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

批量确认

生产者发送一定量消息后,再进行整体确认,代码如下:

  • 确认函数包括前面所有消息,当一条消息返回为nack,直接抛出IOException
  • 问题是:无法知道哪条消息发送失败,一旦失败需要重发批中所有消息
1
2
3
4
5
6
7
8
9
10
11
12
13
int bachCount = 10;
int curCount = 0;
while(!inputStr.equals("end")){
c.basicPublish("", "pc_test", null, inputStr.getBytes(StandardCharsets.UTF_8));
if(++curCount == bachCount){
c.waitForConfirmsOrDie(5000);
curCount = 0;
}
inputStr = scanner.next();
}
if(curCount > 0){
c.waitForConfirmsOrDie(5000);
}

异步确认

基于回调函数进行消息确认,代码如下:

  • 基于消息的sequence number对每一条发布消息进行确认和处理
  • 使用NavigableMap存储未确认的<sequence number, message>,采用排序map的原因是为了解决累计确认(类似于TCP协议)的问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
ConcurrentNavigableMap<Long, byte[]> confirmMap = new ConcurrentSkipListMap<>();
//关键回调代码
ConfirmCallback confirmCallback = new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
//multi表示大于等于当前序列号的均已确认
if(multiple){
ConcurrentNavigableMap<Long, byte[]> confirmed = confirmMap.headMap(deliveryTag, true);
confirmed.clear();
}else{
confirmMap.remove(deliveryTag);
System.out.println("confirm message:" + deliveryTag);
}
}
};
//两个回调函数:成功确认函数,失败确认函数
c.addConfirmListener(confirmCallback, new ConfirmCallback() {
@Override
public void handle(long deliveryTag, boolean multiple) throws IOException {
byte[] body = confirmMap.get(deliveryTag);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
new String(body), deliveryTag, multiple
);
confirmCallback.handle(deliveryTag, multiple);
}
});
while(!inputStr.equals("end")){
//记录未确认消息
confirmMap.put(c.getNextPublishSeqNo(), inputStr.getBytes(StandardCharsets.UTF_8));
c.basicPublish("", "pc_test", null, inputStr.getBytes(StandardCharsets.UTF_8));
inputStr = scanner.next();
}

运行结果:

  • 可见存在累计确认的情况
  • 序列号顺序递增,可以使用循环缓冲区实现缓冲确认功能。

SpringBoot中使用发布确认

需要自定义回调函数ConfirmCallback,并注入到RedisTemplate的对应接口方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class PCCallback implements RabbitTemplate.ConfirmCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if(b){
log.info("exchange receive message {} success",correlationData.getId());
}else{
log.info("exchange receive message {} failed for {}",correlationData.getId(), s);
}
}
}

同时在配置文件中开启发布确认:nonesimplecorrelated(后两者区别暂时不知道)

1
spring.rabbitmq.publisher-confirm-type=correlated

另外还可通过ReturnsCallback回调函数在消息投递失败后,由Exchange将消息返回给生产者,需要在配置文件开启

1
spring.rabbitmq.template.mandatory=true

Consumer Confirms

RabbitMQ在消费端同样包括消息确认机制,分为三种确认模式:

  1. basic.ack:确认消息正常消费。
  2. basic.nack:消息不正常消费,可指定消息重新入队/丢弃/进入死信队列。(negatively acknowledged)
  3. basic.reject:nack的多条版本,拒绝小于等于指定delivertag的所有消息。