RabbitMQ扩展之交换器间的绑定

概要

AMQP-0-9-1中提供了queue.bind方法用于绑定一个队列到一个交换器,然后发送消息的时候,数据流总是先通过交换器(source)最终到达目标队列中(destination)。RabbitMQ实现了扩展,为交换器提供了一个exchange.bind方法用于绑定一个交换器到另一个交换器。交换器之间的绑定和队列与交换器的绑定在语义上是相同的:单向的、使用路由键和多种交换器类型。这一点允许使用者创建更丰富的路由拓扑。exchange.bind方法中的source和destination反映了消息的流向:从源(source)交换器到目标(destination)交换器。

queue.bind方法一样,可以在相同的绑定端点上创建多个不同的交换器绑定,例如:

  • exchange-source -> exchange-destination-1 -> queue-1
  • exchange-source -> exchange-destination-2 -> queue-2
  • exchange-source -> exchange-destination-3 -> queue-3

RabbitMQ在消息传递期间检测并消除循环,并确保在任何路由拓扑上传递给定路由的每个队列,每个队列将只接收该消息的一个副本。

使用了auto-delete参数声明的交换器只有它关联的所有绑定关系都移除(不管是交换器之间的绑定还是交换器和队列的绑定),它自身才会被删除。举个例子:

  • exchange-source -> exchange-destination -> queue-1

如果exchange-source被删除或者解除与exchange-destination的绑定关系同时exchange-destinationqueue-1解除绑定,而exchange-destination使用了auto-delete参数声明,那么exchange-destination就会被删除。

RabbitMQ中还提供了一个exchange.unbind方法进行交换器之间绑定关系的解除。

编码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ExchangeBindingMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDeclare("exchange.source", BuiltinExchangeType.DIRECT, true, false, null);
channel.exchangeDeclare("exchange.destination", BuiltinExchangeType.DIRECT, true, true, null);
channel.queueDeclare("exchange.binding.queue", true, false, false, null);
channel.exchangeBind("exchange.destination", "exchange.source", "exchange.routingKey");
channel.queueBind("exchange.binding.queue", "exchange.destination", "exchange.routingKey");
channel.basicPublish("exchange.source", "exchange.routingKey", MessageProperties.BASIC, "message".getBytes(StandardCharsets.UTF_8));
channel.exchangeUnbind("exchange.destination", "exchange.source", "exchange.routingKey");
// channel.exchangeDelete("exchange.source");
channel.queueUnbind("exchange.binding.queue", "exchange.destination", "exchange.routingKey");
});
}
}

在解除exchange.sourceexchange.destinationexchange.binding.queueexchange.destination之间的绑定后,exchange.destination会自动删除。

(本文完 e-a-20181218 c-1-d)

前提

本文内容参考RabbitMQ官方文档Direct reply-to

直接回复

直接回复(Direct reply-to)是一种可以避免声明回复队列并且实现类似于RPC功能的一种特性。RabbitMQ中允许使用客户端和RabbitMQ消息代理中间件实现RPC模式,典型的做法是:RPC客户端发送请求(消息)到一个持久化的已知服务端队列,RPC服务端消费该服务端队列的消息,然后使用消息属性中的reply-to属性对应的值作为客户端回复队列发送回复消息到RPC客户端。

客户端回复队列需要考虑创建问题。客户端可以为每个请求-响应声明一个一次性的队列,但是这样的做法是十分低效的,因为即使是非持久状态下的非镜像队列,其删除的代价是昂贵的,特别是在集群模式之下。另一个可选的做法是:客户端为回复创建一个持久化的长期存在的队列,这种情况下队列的管理可能变得复杂,因为客户端本身可能不是长期存在的。

r-m-d-r-1.png

实际上,RabbitMQ提供了一个功能,允许RPC客户端直接从其RPC服务端接收回复,并且无需创建回复队列,依赖于RabbitMQ的消息中间件的功能,具体做法是:

对于RPC客户端:

  • RPC客户端创建消费者的时候队列指定为伪队列amq.rabbitmq.reply-to,使用非手动ack模式(autoAck=true)进行消费,伪队列amq.rabbitmq.reply-to不需要显式声明,当然如果需要的话也可以显式声明。
  • 发布消息的时候,消息属性中的reply-to属性需要指定为amq.rabbitmq.reply-to

对于RPC服务端:

  • RPC服务端接收消息后感知消息属性中的reply-to属性存在,它应该通过默认的交换器(名称为"")和reply-to属性作为路由键发送回复消息,那么该回复消息就会直接投递到RPC客户端的消费者中。
  • 如果RPC服务端需要进行一些长时间的计算逻辑,可能需要探测RPC服务端是否存活,可以使用一个一次性使用的信道对reply-to属性做一次队列声明,如果声明成功,队列amq.rabbitmq.reply-to并不会创建,如果声明失败,那么说明客户端已经失去连接。

注意事项:

  • RPC客户端在创建伪队列amq.rabbitmq.reply-to消费者的时候必须使用非手动ack模式(autoAck=true)。
  • 使用此机制发送的回复消息通常不具有容错能力,如果发布原始请求的客户端随后断开连接,它们将被丢弃。
  • 伪队列amq.rabbitmq.reply-to可以在basic.consumebasic.publish和消息属性reply-to中使用,实际上,它并不是一个真实存在的队列,RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息。

说实话,个人认为这种方式有个比较多的局限性:

  • 同一个应用里面,只能使用唯一一个伪队列amq.rabbitmq.reply-to消费回复消息,并且RabbitMQ的Web管理器或者rabbitmqctl list_queues命令都无法展示该伪队列的相关属性或者信息,也就是无法对它进行监控或者管理。
  • 对于多应用同时接进去同一个RabbitMQ消息中间件代理,这些应用之间无法同时使用amq.rabbitmq.reply-to这个特性,因为有可能A客户端发送的消息被远程服务回调到另一个不同的B客户端。

直接回复特性使用

使用伪队列amq.rabbitmq.reply-to的一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ReplyToRawMain extends BaseChannelFactory {

private static final String FAKE_QUEUE = "amq.rabbitmq.reply-to";
private static final String RPC_QUEUE = "rpc.queue";
private static final String DEFAULT_EXCHANGE = "";

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare(RPC_QUEUE, true, false, false, null);
client(channel);
server(channel);
Thread.sleep(5000);
});
}

private static void client(Channel channel) throws Exception {
// 客户端消费 - no-ack,也就是autoAck = true
channel.basicConsume(FAKE_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});
// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(FAKE_QUEUE)
.build();
channel.basicPublish(DEFAULT_EXCHANGE, RPC_QUEUE, basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));
}

private static void server(Channel channel) throws Exception {
// 服务端消费
channel.basicConsume(RPC_QUEUE, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nexchange:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getExchange(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答->客户端
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish(DEFAULT_EXCHANGE, properties.getReplyTo(), basicProperties, body);

}
});
}
}

当然,可以直接创建一个真实的独占队列(生命周期跟客户端的连接绑定)作为回复队列,举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ReplyToMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 服务端队列
channel.queueDeclare("rpc.queue", true, false, false, null);

// 客户端接收应答队列 - 排他队列,生命周期和连接绑定
AMQP.Queue.DeclareOk callback = channel.queueDeclare("", false, true, false, null);

System.out.println("建立排他应答队列:" + callback.getQueue());

// 客户端消费
channel.basicConsume(callback.getQueue(), false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Client]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
}
});

// 服务端消费
channel.basicConsume("rpc.queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println(String.format("[X-Server]\ndeliveryTag:%s\nroutingKey:%s\ncorrelationId:%s\nreplyTo:%s\ncontent:%s\n",
envelope.getDeliveryTag(), envelope.getRoutingKey(), properties.getCorrelationId(),
properties.getReplyTo(), new String(body, StandardCharsets.UTF_8)));
// 服务端应答
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", properties.getReplyTo(), basicProperties, body);
}
});

// 客户端发送
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
.correlationId("message-99999")
.replyTo(callback.getQueue())
.build();
channel.basicPublish("", "rpc.queue", basicProperties, "Reply Message".getBytes(StandardCharsets.UTF_8));

Thread.sleep(5000);
});
}
}

个人想法

在实际项目中,我们经常被RabbitMQ消息发送是否成功这个问题困扰,一般情况下,我们认为调用basic.publish只要不抛出异常就是发送消息成功,例如一个代码模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
public boolean sendMessage(){
boolean success = false;
try {
channel.basicPublish();
// 发送成功
success = true;
}catch (Exception e){
// 发送失败
log.error();
}
return success;
}

这个代码模板在极大多数情况下是合适的,但是有些时候我们确实需要消息的接收方告知发送方已经收到消息,这个时候就需要用到消息的回复功能,个人认为可选的方案有:

  • 消息发布方基于伪队列amq.rabbitmq.reply进行消费,消息接收方回复到伪队列amq.rabbitmq.reply上。
  • 消息发布方自定义独占队列进行消费,消息接收方回复到此独占队列。
  • 消息发布方自定义持久化队列进行消费,消息接收方回复到此持久化队列。

其实,AMQP.BasicProperties的replyTo属性中指定需要回复的队列名只是RabbitMQ提出的一种规约或者建议,并不是强制实行的方案,实际上可以自行选择回复队列或者忽略replyTo属性

前提

本文来源于官方文档Consumer Priorities

消费者优先级

消费者优先级的机制:

  • 高优先级的消费者处于活跃状态的情况下优先接收和处理消息。
  • 消息会流入到低优先级的活跃消费者仅当高优先级的消费者处于阻塞状态。

正常情况下,所有订阅同一个队列的活跃消费者以循环的(round-robin)方式从队列中接收消息。当使用了消费者优先级,如果多个活跃消费者使用了相同的高优先级属性,那么消息投递也是以循环的方式进行(其实使用了相同的优先级类似于没有启用优先级)。

活跃消费者的定义

活跃的消费者就是可以在不用等待的情况下接收和处理消息的消费者,也就是消费者如果无法接收消息,那么它就是出于非活跃状态(或者说阻塞状态),阻塞的常见原因有:

  • 使用了basic.qos之后,消费者在信道中未确认的预读取消息达到了上限。
  • 网络阻塞。

因此,对于每个存在的队列,必定至少出现下面三种情况的其中一种:

  • 队列没有活跃的消费者。
  • 队列是空的。
  • 队列正在忙于向消费者投递消息。

消费者可能在一秒内多次在活跃和阻塞状态之间切换,只要消费处理速度足够快。RabbitMQ不会通过Web管理插件或者rabbitmqctl命令公开消费者当前是活跃还是阻塞状态,换言之,只能通过客户端感知。

启用消费者优先级的时候,RabbitMQ会优先投递消息到优先级属性比较高的消费者,但是如果所有优先级高的消费者都处于阻塞状态,RabbitMQ会把消息投递到活跃的优先级稍低的消费者,而不是一直等待优先级高的消费者解除阻塞,造成优先级低的消费者一直处于饥饿状态。

使用消费者优先级特性

在使用basic.consume方法可以设置参数x-priority的值为整数,数字越大则优先级越高,未设置则使用默认值0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ConsumerPriorityMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
Map<String, Object> consumerArgs = new HashMap<>(8);
consumerArgs.put("x-priority", 10);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
consumerArgs.put("x-priority", 100);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
});
}
}

上面的例子设置了两个消费者,后者的优先级为100,而前者的优先级为10。

前提

本文来源于官方文档Consumer Prefetch

消费者消息预读取

消费者消息预读取是一个更加合理和高效的限制未确认消息数量的解决方式。

AMQP 0-9-1协议中定义了basic.qos方法用于限制信道或者连接上的未确认消息数量,这个消息数据量命名为prefetch_count。不幸的是,信道其实并不是限制未确认消息数量的理想范畴,因为单个信道有可能有多个消费者订阅多个不同的队列,所以信道和队列需要为发送的每个消息相互协调,以确保消息总数量不超过限制,造成了性能下降,单机性能出现瓶颈,在集群方案中耗时更加严重。

basic.qos定义了两个属性:

  • prefetch_count:预读取消息的数量。
  • global:是否全局的。

在许多情况下,指定每个消费者的预读取消息数量更加合理。因此,RabbitMQ在basic.qos方法中重新定义了global标志的含义:

global的值 prefetch_count在AMQP 0-9-1中的含义 prefetch_count在RabbitMQ中的含义
false 同一个信道上的消费者共享 单独应用于信道上的每个新消费者
true 所有消费者基于同一个连接共享 同一个信道上的消费者共享

basic.qos方法在RabbitMQ的Java驱动中对应三个方法:

1
2
3
4
5
6
7
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0
void basicQos(int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0 , global = false
void basicQos(int prefetchCount) throws IOException;
  • prefetchSize:预读取的消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限。
  • prefetchCount:预读取的消息数量上限,0表示无上限。
  • global:false表示prefetchCount单独应用于信道上的每个新消费者,true表示prefetchCount在同一个信道上的消费者共享。

限制单个消费者

1
2
3
4
5
6
7
8
9
public class BasicQosSingle extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicQos(10); //基于消费者进行限制
channel.basicConsume("throwable.queue.direct",new DefaultConsumer(channel){});
});
}
}

此消费者最多只能有10条预读取的未确认的消息。

独立限制多个消费者

基于同一个信道对多个队列建立不同的消费者:

1
2
3
4
5
6
7
8
9
10
11
12
public class BasicQosMulti extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
channel.basicQos(10); //基于消费者进行限制
channel.basicConsume("throwable.queue.direct",consumer1);
channel.basicConsume("throwable.queue.fanout",consumer2);
});
}
}

每个费者最多只能有10条预读取的未确认的消息。

基于共享限制多个消费者

AMQP规范没有解释如果使用不同的global多次调用basic.qos会发生什么,RabbitMQ将此解释为意味着两个预取限制应该彼此独立地强制执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
public class BasicQosShare extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
channel.basicQos(10, false); //基于消费者进行限制
channel.basicQos(15, true); //基于信道进行限制
channel.basicConsume("throwable.queue.direct",consumer1);
channel.basicConsume("throwable.queue.fanout",consumer2);
});
}
}

上面的代码表示:

  • 两个消费者consumer1和consumer2基于信道最多只能有15条未确认的预读取消息。
  • 消费者consumer1和consumer2自身最多只能有10条未确认的预读取消息。

也就是有双重限制,这种限制需要信道和队列之间协调,会耗费额外的性能。

消息预读取的意义

消息预读取可以理解为RabbitMQ Broker把未确认的消息批量推送到RabbitMQ的Java客户端中,由客户端先缓存这些消息,然后投递到消费者中。试想,如果在推模式下,没有消息预读取功能,RabbitMQ Broker每次投递一条消息到客户端消费者中,这样就会产生大量的IO操作,导致性能下降,此外,消费者处理速度有可能比较快,容易产生消费者饥饿的情况。可以根据消费者实际的消费速度和消息发布的速度,对消费者的预读取未确认消息的上限进行配置,这样在大多数场景下可以提高消费者的性能。

前提

本文来源于官方文档Consumer Cancel Notification

消费者取消通知

当一个信道上建立的消费者订阅了一个队列,有可能出现各种原因导致消费停止。一个很明显的原因就是客户端在同一个信道上发出basic.cancel命令,消息中间件代理响应basic.cancel-ok,将会导致消费者被取消。还有其他的事件如队列的删除或者集群方案所在队列的集群节点失败也有可能导致消费者被取消,消费者被取消这个事件并不会通知客户端对应的信道,这样子会造成客户端无法感知消费者被取消

为了避免上面这些情况出现,RabbitMQ引入了扩展特性:由于消息中间件代理出现的异常或者正常情况导致消费者取消,会向对应的消费者(信道)发送basic.cancel,但是由客户端信道主动向消息中间件代理发送basic.cancel以取消消费者的情况下不会受到消息中间件代理的basic.cancel回复。

有些情况下,客户端感知到异常(例如队列删除等)主动向消息中间件代理发送basic.cancel,这个时候,消息中间件代理也有可能因为队列删除主动向对应的消费者(信道)发送basic.cancel,也就是存在竞争,RabbitMQ代理收到前者的basic.cancel时不会出现异常,基于后者还是正常回复basic.cancel-ok

举个例子,情况一:例如我们主动取消信道上的消费者

1
2
3
4
5
6
7
8
9
10
11
12
public class InitiativeBasicCancel extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
// 此方法返回的是消费者的标签
String consumerTag = channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {

});
channel.basicCancel(consumerTag);
});
}
}

情况二:假如我们想监听消息中间件代理异步回调的basic.cancelbasic.cancel-ok,应该这样做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class AsyncBasicCancel extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {

@Override
public void handleCancelOk(String consumerTag) {
System.out.println("收到来自消息中间件代理的basic.cancel-ok回复,consumerTag=" + consumerTag);
}

@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("收到来自消息中间件代理的basic.cancel回复,consumerTag=" + consumerTag);
}
});
});
}
}

一般情况下,我们应该同时考虑情况一和情况二有可能同时发生(也就是前面说到的竞争),并且做好相应的处理即可。

前提

前一篇文章介绍到RabbitMQ相关组件的声明,组件声明完成之后,就可以发送消息和消费消息,消费消息的时候需要考虑消息的确认。

消息的发送

消息的发送只依赖于交互器(名称)、可选路由键和可选的Header参数,可选路由键和Header可以认为是路由参数。因为RabbitMQ有四种内建的交换器,加上特殊的默认交换器可以认为有五种,这里列举一下通过这五种交换器发送消息需要的参数:

交换器类型 路由参数
默认交换器(AMQP default) 交换器名称(空字符串)和队列名称
Direct交换器 交换器名称和路由键
Fanout交换器 交换器名称(API中必须提供路由键,可以随意输入)
Topic交换器 交换器名称和路由键
Headers交换器 交换器名称和Header参数(API中必须提供路由键,可以随意输入)

消息的发布依赖于Channel的basicPublish方法,按照惯例查看其重载方法中参数列表长度最大的方法:

1
2
3
4
5
6
void basicPublish(String exchange, 
String routingKey,
boolean mandatory,
boolean immediate,
BasicProperties props,
byte[] body) throws IOException;
  • exchange:交换器名称。
  • routingKey:路由键。
  • mandatory:是否强制的,如果此属性设置为true,消息发布的时候如果根据exchange和routingKey无法找到可达的目标队列,会调用AMQP方法basic.return将该消息返回给消息发布者;如果此属性设置为false,出现上面的情况,消息会被消息中间件代理直接丢弃。
  • immediate:是否立即的,如果此属性设置为true,消息通过exchange和routingKey找到目标队列(一个或者多个),如果所有的目标队列都没有消费者,那么会调用AMQP方法basic.return将该消息返回给消息发布者。
  • props:BasicProperties类型,消息属性或者叫消息元数据,com.rabbitmq.client.MessageProperties已经提供了一些列的实现,如果不满足可以使用BasicProperties.Builder自行构建。
  • body:字节数组类型,消息的有效负载,一般我们说的消息或者消息体就是指这个。

值得注意的是:immediate属性在RabbitMQ-3.0版本已经被移除,具体原因是:

r-m-s-c-c-1.png

翻译一下就是:immediate属性原有的功能对于基础代码的复杂性太高,特别是在镜像队列的条件下。它还影响到镜像队列的性能优化,推荐使用TTL(Time To Live,队列消息过期特性)或者DLX(Dead Letter Exchange,死信交换器)替代。在RabbitMQ-3.0后的版本如果immediate设置为true,会抛异常。

举个消息发送的例子(下面的例子中,每次发送之前都声明交换器、队列和绑定,实际上我们不需要这样操作,如果依赖于Servlet容器,可以在容器启动之后做一次声明即可):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MessageSendMain extends BaseChannelFactory {

private static final String DEFAULT_EXCHANGE = "";

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//使用Direct类型的交换器
channel.queueDeclare("throwable.queue.direct", true, false, false, null);
channel.exchangeDeclare("throwable.exchange.direct", BuiltinExchangeType.DIRECT, true, false, null);
channel.queueBind("throwable.queue.direct", "throwable.exchange.direct", "direct.routingKey", null);
//发送"Direct Message"到队列throwable.queue.direct
channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));

//其实也可以通过默认的交换器直接发送消息到队列throwable.queue.direct
channel.basicPublish(DEFAULT_EXCHANGE, "throwable.queue.direct",
MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));

//使用Fanout类型的交换器
channel.queueDeclare("throwable.queue.fanout", true, false, false, null);
channel.exchangeDeclare("throwable.exchange.fanout", BuiltinExchangeType.FANOUT, true, false, null);
//这里路由键随便写,或者用空字符串也可以
channel.queueBind("throwable.queue.fanout", "throwable.exchange.fanout", "random", null);
channel.basicPublish("throwable.exchange.fanout", "random",
MessageProperties.TEXT_PLAIN, "Fanout Message".getBytes(StandardCharsets.UTF_8));

//使用Topic类型的交换器
channel.queueDeclare("throwable.queue.topic", true, false, false, null);
channel.exchangeDeclare("throwable.exchange.topic", BuiltinExchangeType.TOPIC, true, false, null);
channel.queueBind("throwable.queue.topic", "throwable.exchange.topic", "topic.routingKey.#", null);
channel.basicPublish("throwable.exchange.topic", "topic.routingKey.throwable",
MessageProperties.TEXT_PLAIN, "Topic Message".getBytes(StandardCharsets.UTF_8));

//使用Headers类型的交换器
channel.queueDeclare("throwable.queue.headers", true, false, false, null);
channel.exchangeDeclare("throwable.exchange.headers", BuiltinExchangeType.HEADERS, true, false, null);
Map<String, Object> headerBindingArgs = new HashMap<>(8);
headerBindingArgs.put("headers.name", "throwable");
headerBindingArgs.put("headers.age", 25);
headerBindingArgs.put("x-match", "all");
//这里路由键随便写,或者用空字符串也可以,要添加Headers参数到绑定参数中
channel.queueBind("throwable.queue.headers", "throwable.exchange.headers", "random", headerBindingArgs);
AMQP.BasicProperties basicProperties = MessageProperties.TEXT_PLAIN;
//这里发送消息的时候要添加Headers参数到消息属性中
AMQP.BasicProperties realBasicProperties = basicProperties.builder().headers(headerBindingArgs).build();
channel.basicPublish("throwable.exchange.headers", "random",
realBasicProperties, "Headers Message".getBytes(StandardCharsets.UTF_8));
});
}
}

消息的元数据

消息元数据接口是com.rabbitmq.client.BasicProperties,实现类是com.rabbitmq.client.AMQP$BasicProperties,可以看一下具体的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;

//省略Setter、Getter和Builder方法
}

属性分析:

  • contentType:消息内容类型,类似于HTTP协议中的Content-Type,例如:application/json。
  • contentEncoding:消息内容编码,类似于MIME的内容编码,例如:gzip。
  • headers:头部属性,K-V结构,一般使用在Headers的交换器和绑定中,很多时候被开发者滥用用来传输一些自定义属性,其实也无可厚非。
  • deliveryMode:消息的持久化模式,deliveryMode=1代表消息不持久化(nonpersistent),deliveryMode=2代表消息持久化(persistent)。
  • priority:消息优先级,可选值为0-255,值越大优先级越大,注意要和队列的优先级区分。
  • correlationId:客户端定义的用于客户端区分和标识消息的唯一标记。
  • replyTo:需要应答的目标队列名,只有一个持有值,不会有任何额外的操作,spring-amqp模块对这个值做了额外的操作,不要混淆原生Java驱动和二次封装的框架。
  • expiration:消息过期时间,单位为毫秒。
  • messageId:消息的唯一标识,消息中间件代理对消息接收去重的一个重要标识。
  • timestamp:消息发送时的时间戳。
  • type:可选的消息类型。
  • userId:可选的发布消息的用户的唯一标识。
  • appId:可选的发布消息的应用的唯一标识。
  • clusterId:集群唯一标识,AMQP-0-9-1已经弃用,供RabbitMQ集群应用程序使用的集群内路由标识符。

消息元数据的每个属性基本对应着AMQP规范中的属性值,以上的描述来源于AMQP协议,RabbitMQ中的实现要自行实践相关的属性com.rabbitmq.client.MessageProperties中已经有几个现成的BasicProperties实例,如果合适的话可以直接拿过来使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class MessageProperties {

/** Empty basic properties, with no fields set */
public static final BasicProperties MINIMAL_BASIC =
new BasicProperties(null, null, null, null,
null, null, null, null,
null, null, null, null,
null, null);
/** Empty basic properties, with only deliveryMode set to 2 (persistent) */
public static final BasicProperties MINIMAL_PERSISTENT_BASIC =
new BasicProperties(null, null, null, 2,
null, null, null, null,
null, null, null, null,
null, null);

/** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties BASIC =
new BasicProperties("application/octet-stream",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_BASIC =
new BasicProperties("application/octet-stream",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
public static final BasicProperties TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
1,
0, null, null, null,
null, null, null, null,
null, null);

/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}

mandatory属性的作用

mandatory属性主要是用于消息发送路由失败后配置消息返回(return)功能使用,可以故意造一下消息发布路由失败的场景,验证一下mandatory属性的作用。之前的例子中曾经建立过一个Direct类型的交换throwable.exchange.direct和队列throwable.queue.direct基于路由键direct.routingKey进行了绑定,我们开启mandatory特性,故意把路由键弄错,看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MandatoryMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicPublish("throwable.exchange.direct", "doge",
true, false, null, "Mandatory Message".getBytes(StandardCharsets.UTF_8));
//使用addReturnListener(ReturnCallBack)也可以
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
StringBuilder builder = new StringBuilder();
builder.append("返回码:").append(replyCode).append("\n");
builder.append("返回信息:").append(replyText).append("\n");
builder.append("交换器:").append(exchange).append("\n");
builder.append("路由键:").append(routingKey).append("\n");
builder.append("消息体:").append(new String(body, StandardCharsets.UTF_8));
System.out.println(builder.toString());
}
});
//因为是异步回调,这里要sleep一下
Thread.sleep(2000);
});
}
}

执行之后,控制台打印:

1
2
3
4
5
返回码:312
返回信息:NO_ROUTE
交换器:throwable.exchange.direct
路由键:doge
消息体:Mandatory Message

可见路由失败的消息直接原样返回,这样就能确保路由错误的情况下消息也不会丢失。

消息发送的确认机制

前面提到的mandatory属性和消息返回机制能保证路由失败的消息也不丢失,实际上消息发送的时候允许使用消息发送确认(Confirm)机制,这样可以确认客户端发送的消息是否已经到达了消息中间件代理。消息发送的确认机制主要包括轻量级的确认和消息事务,这一小节介绍一下轻量级的确认。消息发送的轻量级确认需要把信道(Channel)更变为Confirm模式,通过等待消息中间件代理消息是否到达的确认回调,依赖到的方法或者类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//信道更变为Confirm模式
Confirm.SelectOk confirmSelect() throws IOException;

//等待消息中间件确认消息到达 - 同步阻塞法法
boolean waitForConfirms() throws InterruptedException;

//等待消息中间件确认消息到达,可以设置超时,单位毫秒 - 同步阻塞方法
boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

//等待消息中间件确认消息到达,存在任一消息未到达,则抛出IOException - 同步阻塞方法
void waitForConfirmsOrDie() throws IOException, InterruptedException;

//等待消息中间件确认消息到达,可以设置超时,单位毫秒,存在任一消息未到达,则抛出IOException - 同步阻塞方法
void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

//消息发布确认回调接口
public interface ConfirmListener {
void handleAck(long deliveryTag, boolean multiple)
throws IOException;

void handleNack(long deliveryTag, boolean multiple)
throws IOException;
}

消息发送确认模式开启之后,每条消息都会基于同一个信道下新增一个投递标签(deliveryTag)属性,deliveryTag属性是从1开始递增的整数,只要新建一个信道实例就会重置为1,一定要十分注意,这个消息投递标签和消息消费中的信封(Envelope)中的deliveryTag不是同一个属性,后者虽然也是从1开始递增,但是它是基于队列而不是信道。举个简单的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ConfirmMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("消息发送确认回调成功,序号:%s,是否批量:%s", deliveryTag, multiple));
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("消息发送确认回调失败,序号:%s,是否批量:%s", deliveryTag, multiple));
}
});
channel.confirmSelect();
channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
if (!channel.waitForConfirms()) {
System.out.println("消息发送确认失败!");
} else {
System.out.println("消息发送确认成功!");
}
Thread.sleep(2000);
});
}
}

消息发布确认基本上是遵循下面的代码模板:

1
2
3
4
5
6
7
8
channel.addConfirmListener(new ConfirmListener() {
//...
};
channel.confirmSelect();
channel.basicPublish();
if (!channel.waitForConfirms()) {
//...
}

当然,这里演示的仅仅是单条的消息发布确认,这种做法性能会相对低下,但是可靠性会提高,编码难度也相对比较低。

消息发布事务

消息发布事务能够保证消息发布到RabbitMQ的Broker这个动作是一个原子操作,也就是开启了消息发布事务模式,消息能明确知道发布成功或者失败。使用消息发布事务需要把信道转换为事务模式,然后进行消息发布和事务提交(或者回滚),依赖于下面的方法:

1
2
3
4
5
6
7
8
//信道转换为事务模式
Tx.SelectOk txSelect() throws IOException;

//提交事务
Tx.CommitOk txCommit() throws IOException;

//回滚事务
Tx.RollbackOk txRollback() throws IOException;

消息发布事务的基本代码模板如下:

1
2
3
4
5
6
7
try { 
channel.txSelect();
channel.basicPublish();
channel.txCommit();
}catch (Exception e){
channel.txRollback();
}

举个简单的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class MessagePublishTxMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
try {
channel.txSelect();
channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
}
});
}
}

一般来说,事物基本是遵循"等价交换的原则",消息发布的可靠性是需要性能换取的,消息发布事务的可靠性是最高的,但是代价是它的性能是比较低的。

消息的消费

消息消费主要包括推模式和拉模式,区别如下:

  • 推模式:客户端注册消费者到消息中间件代理的队列,也就是对队列进行订阅,消息中间件代理通过队列投递(deliver)消息到消费者中,典型方法是basic-consume
  • 拉模式:客户端主动向消息中间件代理拉取队列中的消息,典型方法是basic-get

消息消费之推模式

推模式下,消息的消费依赖于Channel的basicConsume方法(用的是最新的RabbitMQ的Java驱动,关于消息消费的方法新增了不少,在3.X版本只有几个方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
String basicConsume(String queue,
boolean autoAck,
String consumerTag,
boolean noLocal,
boolean exclusive,
Map<String, Object> arguments,
Consumer callback) throws IOException;

String basicConsume(String queue,
boolean autoAck,
String consumerTag,
boolean noLocal,
boolean exclusive,
Map<String, Object> arguments,
DeliverCallback deliverCallback,
CancelCallback cancelCallback,
ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

别看第二个方法的参数列表很庞大很吓人,它只是把com.rabbitmq.client.Consumer接口的部分方法拆解出来做成单独的接口,方便使用Lambda表达式,参数分析如下:

  • queue:消费者订阅的队列名称。
  • autoAck:是否自动确认(主动ack)。
  • consumerTag:消费者标签,队列中消费者的唯一标识,如果不指定则由消息中间件代理自动生成,停止消费者和取消消费者都是基于此标识属性。
  • noLocal:是否非本地的,如果此属性为true,则消息中间件代理不会投递消息到此消费者如果发布消息使用的连接和当前消费者建立的通道所在的连接是同一个连接,但是RabbitMQ不支持此属性。
  • exclusive:是否独占(排他)的,如果此属性为true,队列中只能有一个消费者(也就是当前设置了exclusive=true的消费者),消费者关闭(shutdown)
  • arguments:消费者参数,K-V结构。

下面看一下ConsumerDeliverCallbackCancelCallbackConsumerShutdownSignalCallback的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface Consumer {

//创建消费者成功的回调
void handleConsumeOk(String consumerTag);

//消费者取消成功的回调 - 主动调用basicCancel或者队列删除等因素
void handleCancelOk(String consumerTag);

//消费者取消的回调 - 主动调用basicCancel或者队列删除等因素
void handleCancel(String consumerTag) throws IOException;

//消费者关闭的信号回调
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

//AMQP方法basic.recover-ok的回调
void handleRecoverOk(String consumerTag);

//消息推模式下接受消息中间件代理投递的消息 - 核心方法
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}

@FunctionalInterface
public interface DeliverCallback {

//Delivery中持有了Envelope、AMQP.BasicProperties和消息体body
void handle(String consumerTag, Delivery message) throws IOException;
}

@FunctionalInterface
public interface CancelCallback {

void handle(String consumerTag) throws IOException;
}

@FunctionalInterface
public interface ConsumerShutdownSignalCallback {

void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
}

DeliverCallbackCancelCallbackConsumerShutdownSignalCallback实际上就是把Consumer接口中的部分方法抽离出来编写为函数式接口,没有特别的东西,所以我们还是关注Consumer接口的使用就可以了。Consumer接口有一个默认的实现类DefaultConsumer,可以直接使用。在旧版本的Java驱动中还存在一个废弃的QueueingConsumer,在5.X版本的驱动已经删除该类。其实,我们也可以自行实现Consumer接口,因为DefaultConsumer也仅仅是对Consumer接口进行了空实现,具体的方法还是需要我们覆盖实现自定义的逻辑。这里举个简单的使用例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ConsumeMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicConsume("throwable.queue.direct", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消息序号:" + envelope.getDeliveryTag());
System.out.println("交换器:" + envelope.getExchange());
System.out.println("路由键:" + envelope.getRoutingKey());
System.out.println("消息内容:" + new String(body, StandardCharsets.UTF_8));
}
});
//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
Thread.sleep(Integer.MAX_VALUE);
});
}
}

r-m-s-c-c-2.png

可以从Web管理界面看到消费者已经启动,消费者标签是由RabbitMQ代理随机生成的,我们开启了消息自动确认,所以Ack required一栏是空心的圆形,也就是不需要进行消息消费确认。还有一点需要注意的是:Consumer接口的回调也就是DefaultConsumer中的方法回调是委托到RabbitMQ的Java驱动中的线程池执行,过程是异步的,也就是我们在写Demo的时候需要想办法挂起DefaultConsumer实例所在的线程和主线程,否则会导致线程退出无法消费消息

消息消费之拉模式

拉模式下,消息消费主要依赖于Channel的basicGet方法:

1
GetResponse basicGet(String queue, boolean autoAck) throws IOException;

此方法很简单,只依赖于队列名称和是否自动确认两个参数,如果autoAck为false,需要手动确认。返回值如下:

1
2
3
4
5
6
7
8
9
10
11
12
public class GetResponse {
//信封对象,主要用于获取消息的投递标签DeliveryTag、交换器、路由键等
private final Envelope envelope;
//消息元数据
private final BasicProperties props;
//消息体
private final byte[] body;
//当前队列中剩余消息数量,只是个参考值
private final int messageCount;

//省略Getter方法
}

信封对象中的投递标签DeliveryTag很重要,用于手动确认的时候指定对应的值。举个简单的使用例子:

1
2
3
4
5
6
7
8
9
10
public class BasicGetMain extends BaseChannelFactory{

public static void main(String[] args) throws Exception{
provideChannel(channel -> {
GetResponse getResponse = channel.basicGet("throwable.queue.direct", true);
System.out.println(String.format("消息内容:%s,消息序号:%s", new String(getResponse.getBody(), StandardCharsets.UTF_8),
getResponse.getEnvelope().getDeliveryTag()));
});
}
}

消息消费的确认机制

消息消费的确认机制保障消息中间件代理的消息成功投递到消费者中,主要包括三种类确认:

  • 主动积极确认:主动积极确认成功后,消息会从队列中移除,支持批量确认操作,典型方法是basic-ack,下面直接叫ack。
  • 主动消极确认:消极积极确认成功后,基于basic-get或者basic-consume等到的消息标签,可以选择消息重新入队列或者直接丢弃,支持批量操作,典型方法是basic-nack,下面直接叫nack。
  • 拒绝:基于basic-get或者basic-consume等到的消息标签进行消息拒绝,可以选择丢弃或者重新入队,下面叫做reject。

nack和reject的基本功能是相同的,nack同时支持批量操作和单条操作,而reject只支持单条操作。消息消费确认其实是手动确认,如果针对的是basicConsume方法,则其autoAck属性需要设置为false,否则有可能会出现重复确认导致异常。

ack

ack依赖于Channel的basicAck方法:

1
void basicAck(long deliveryTag, boolean multiple) throws IOException;
  • deliveryTag:Envelope(信封)对象中的消息标签。
  • multiple:是否使用批量积极确认,如果此属性为true,则消息标签小于当前deliveryTag的所有消息都会被主动积极确认,不了解此属性最好使用false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class AckMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//这里autoAck设置为false
channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消息序号:" + envelope.getDeliveryTag());
System.out.println("交换器:" + envelope.getExchange());
System.out.println("路由键:" + envelope.getRoutingKey());
System.out.println("消息内容:" + new String(body, StandardCharsets.UTF_8));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
Thread.sleep(Integer.MAX_VALUE);
});
}
}

nack

nack依赖于Channel的basicNack方法:

1
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  • deliveryTag:Envelope(信封)对象中的消息标签。
  • multiple:是否使用批量消极确认,如果此属性为true,则消息标签小于当前deliveryTag的所有消息都会被主动消极确认,不了解此属性最好使用false。
  • requeue:是否重新入队,如果此属性为true,消息会被重新放置回去对应队列(如果可能的话,会放回到原来的位置),如果此属性为false,消息直接被丢弃。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class NackMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//这里autoAck设置为false
channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消息序号:" + envelope.getDeliveryTag());
System.out.println("交换器:" + envelope.getExchange());
System.out.println("路由键:" + envelope.getRoutingKey());
System.out.println("消息内容:" + new String(body, StandardCharsets.UTF_8));
channel.basicNack(envelope.getDeliveryTag(), false, false);
}
});
//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
Thread.sleep(Integer.MAX_VALUE);
});
}
}

属性requeue如果设置为true,需要谨慎设计程序的逻辑,否则很有可能导致消息一直重复消费失败并且重复重新入队,表现为消费者线程出现死循环逻辑,耗尽服务器CPU资源。

reject

reject的用法和nack基本一致,不过reject没有批量处理功能,依赖于Channel的basicReject方法:

1
void basicReject(long deliveryTag, boolean requeue) throws IOException;

简单举个使用例子:

1
2
3
4
5
6
7
8
9
public class RejectMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
GetResponse getResponse = channel.basicGet("throwable.queue.direct", true);
channel.basicReject(getResponse.getEnvelope().getDeliveryTag(), false);
});
}
}

消费消息不进行消息确认会怎么样

消息消费方法basiConsume中的autoAck属性设置为false,但是消费者接收到消息后不进行消息确认会怎么样?举个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NoneAckMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.basicPublish("throwable.exchange.direct", "direct.routingKey",
MessageProperties.TEXT_PLAIN, "Direct Message".getBytes(StandardCharsets.UTF_8));
//autoAck = false
channel.basicConsume("throwable.queue.direct", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("消息序号:" + envelope.getDeliveryTag());
System.out.println("交换器:" + envelope.getExchange());
System.out.println("路由键:" + envelope.getRoutingKey());
System.out.println("消息内容:" + new String(body, StandardCharsets.UTF_8));
}
});
//消息消费是异步的,要想办法阻塞消费者所在的线程和主线程,否则会退出
Thread.sleep(Integer.MAX_VALUE);
});
}
}

执行之后,控制台输出:

1
2
3
4
消息序号:1
交换器:throwable.exchange.direct
路由键:direct.routingKey
消息内容:Direct Message

查看RabbitMQ的Web管理界面,发现消息由Ready转变为Unacked状态:

r-m-s-c-c-3.png

尝试终止消费者所在线程,再次观察RabbitMQ的Web管理界面对应队列:

r-m-s-c-c-4.png

发现消息由Unacked状态恢复为Ready。这里需要注意,只有Ready状态的消息才能被消息中间件代理投递到消费者。总的来说就是:

  • 被路由到队列的新消息的状态为Ready,这种消息可以被消息中间件代理投递到客户端的消费者中。
  • 客户端消费者在消费消息的时候,如果采用手动确认(autoAck=false)并且没有主动确认(也就是没有调用basicAckbasicNack或者basicReject),那么消息就会变为Unacked状态,Unacked状态的消息只有当所有的消费者线程终止的时候,才会重新转变为Ready状态。

小结

这篇文章仅仅从基本使用来分析RabbitMQ中的消息发送、消费和确认的例子。关于消息发布确认机制和消息发布事务机制后面有专门的文章分析其性能和具体使用场景。

RabbitMQ中的消息发布确认(publish confirm)和消息消费(投递)确认(deliver confirm)能够确保消息发布和消息消费阶段消息不会丢失,至于策略应该根据具体场景选择,autoAck并不适合所有的场景。

(本文完 c-2-d e-a-20181125)

前提

如果能提前先阅读一下之前写过的一篇文章理解RabbitMQ中的AMQP-0-9-1模型,那么这篇文章应该会比较容易理解。

引入依赖

先确认已经安装了RabbitMQ的服务,并且开启了Web管理插件,方便直接从Web管理界面查找到队列、交换器和绑定。个人有软件洁癖,喜欢把软件和依赖保持升级到最新版本。引入RabbitMQ的Java驱动:

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.0</version>
</dependency>

本文介绍RabbitMQ通过其Java驱动声明队列、交换器和绑定。对于队列和交换器,其首次声明也是创建的操作。队列、交换器和绑定的声明依赖于通道(Channel),对应的是com.rabbitmq.client.Channel接口。在使用RabbitMQ的Java驱动的时候,一般在我们都使用下面的方式进行组件的声明操作:

  • 1、基于RabbitMQ连接信息构建com.rabbitmq.client.ConnectionFactory实例。
  • 2、基于ConnectionFactory新建一个com.rabbitmq.client.Connection实例。
  • 3、基于Connection新建一个com.rabbitmq.client.Channel实例。
  • 4、通过Channel实例声明(删除、解除绑定)队列、交换器或者绑定(Channel实例是可以复用的)。

虽然Channel实例是可以复用的,但是为了简化测试方法的编写,我们可以写下个简单的基础类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public abstract class BaseChannelFactory {

protected static void provideChannel(ChannelAction channelAction) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
try {
channelAction.doInChannel(channel);
} finally {
channel.close();
connection.close();
}
}

interface ChannelAction {

void doInChannel(Channel channel) throws Exception;
}
}

这样子,每次调用都是新建的Connection和Channel,我们只需要重点关注ChannelAction的实现即可。

在查看一下框架类型的API文档的时候有个很重要的技巧:如果提供的方法有重载,只需要看参数最多的基础方法

队列的相关操作

队列的相关参数主要包括队列的声明(declare)、删除(delete)和清空队列消息(purge)。

队列的声明

队列的声明依赖于com.rabbitmq.client.ChannelqueueDeclare方法。queueDeclare方法的多个重载都是同步方法,提供同样功能的还有一个异步方法queueDeclareNoWait。下面选取queueDeclare参数列表长度最大的方法进行分析:

1
2
3
4
5
Queue.DeclareOk queueDeclare(String queue,
boolean durable,
boolean exclusive,
boolean autoDelete,
Map<String, Object> arguments) throws IOException;

参数如下:

  • queue:需要声明的队列名称。
  • durable:是否开启持久化特性,如果此属性为true,消息中间件代理重启后队列会被重新声明(也就是说不会被删除),注意这个特性和消息的持久化特性完全不相关
  • exclusive:是否独占的,如果此属性为true,则队列的存在性绑定在创建它的连接上,意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
  • autoDelete:是否自动删除,如果此属性为true,意味着队列不被使用的时候会被消息中间件代理删除,实际上意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
  • arguments:K-V结构,队列参数,一般和消息中间件代理或者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等,后面会有专门文章介绍这些特性。

队列声明的返回值是Queue.DeclareOk实例:

1
2
3
4
5
6
public interface DeclareOk extends Method {
String getQueue();
int getMessageCount();
int getConsumerCount();
//...
}

可以从中得知声明的队列名、队列中的消息数量、队列当前的消费者数量,这个返回值对于无参数的queueDeclare方法十分有意义:

1
Queue.DeclareOk queueDeclare() throws IOException;

因为这个方法声明的队列名称是由消息中间件代理随机生成,队列名就是通过返回值告知客户端的。这里贴一个简单的例子:

1
2
3
4
5
6
7
8
9
public class QueueDeclareMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("throwable.queue", true, false, false, null);
System.out.println(declareOk.getQueue());
});
}
}

运行后控制台打印throwable.queue说明队列声明成功,可以查看RabbitMQ的Web管理界面:

r-m-d-1.png

可见队列的确已经被创建,但是Bindings一栏显示队列只绑定到默认的交换器中,这个时候其实已经可以通过默认的交换器向队列中发送消息。队列声明失败的时候会直接抛出异常,一般是IOException。上面的例子中是我们最常见到的队列声明方式,声明出来的队列开启了队列持久化特性、非独占的、非自动删除的,也就是即使RabbitMQ服务重启了,队列依然会存在(被重新声明),但是并不是所有的场景都需要这种声明方式(说实话,目前笔者没碰到不使用这种声明方式的场景,有点惭愧)。还有一点需要重点关注:队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常

队列的被动(Passive)声明

队列的被动声明,其实是检查队列在消息代理中间件是否存在的判断方法,依赖于Channel的queueDeclarePassive方法:

1
Queue.DeleteOk queueDelete(String queue) throws IOException;

它只依赖于一个参数-队列名称,如果队列名称对应的队列已经存在,则返回Queue.DeleteOk实例,如果队列不存在,会抛出IOException,通常是一个被包装为IOException的ShutdownSignalException,而ShutdownSignalException是运行时异常的子类。举个列子:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class QueueDeclarePassiveMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//队列throwable.queue已存在
AMQP.Queue.DeclareOk declareOk = channel.queueDeclarePassive("throwable.queue");
System.out.println(declareOk.getQueue());
//队列throwable.queue.passive不存在
declareOk = channel.queueDeclarePassive("throwable.queue.passive");
System.out.println(declareOk.getQueue());
});
}
}

由于throwable.queue.passive队列不存在,因此会抛出IOException,追踪异常栈查看底层的异常是:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue ‘throwable.queue.passive’ in vhost ‘/’, class-id=50, method-id=10)

利用这个方法的特性可以尝试编写一个方法确认队列是否存在,例如:

1
2
3
4
5
6
7
8
9
10
private static boolean isQueueExisted(Channel channel, String queueName) {
boolean flag = false;
try {
channel.queuePurge(queueName);
flag = true;
} catch (IOException e) {
//no-op
}
return flag;
}

队列的删除

队列的删除对应的是Channel的queueDelete方法:

1
2
3
4
//基于队列名删除队列,不关注队列是否被使用,也不关注队列中是否存在消息
Queue.DeleteOk queueDelete(String queue) throws IOException;

Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

参数如下:

  • queue:队列名称。
  • ifUnused:判断队列是否被不被使用,如果为true,只有不被使用的队列才能被删除。
  • ifEmpty:判断队列是否为空(没有消息积压),如果为true,只有空的队列才能被删除。

其实也就是只依赖队列名的单参数的queueDelete就是强制删除方法,举个例子:

1
2
3
4
5
6
7
8
9
10
public class QueueDeleteMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.DeleteOk deleteOk = channel.queueDelete("throwable.queue");
System.out.println(String.format("Delete queue [%s] successfully,message count = %d!",
"throwable.queue", deleteOk.getMessageCount()));
});
}
}

一般来说,队列的删除功能权限最好不要下放到应用程序中,除非有特殊的需要,如果需要删除队列,最好使用queueDelete(String queue, boolean ifUnused, boolean ifEmpty)方法,否则有可能造成消息丢失。

队列的清空

队列的清空操作会删除队列中的所有消息,依赖的方法是Channel的queuePurge方法:

1
Queue.PurgeOk queuePurge(String queue) throws IOException;

此方法会基于队列名清除对应队列中的所有内容,使用的时候需要谨慎,举个例子:

1
2
3
4
5
6
7
8
9
10
public class QueuePurgeMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
AMQP.Queue.PurgeOk purgeOk = channel.queuePurge("throwable.queue");
System.out.println(String.format("Purge queue [%s] successfully,message count = %d!",
"throwable.queue", purgeOk.getMessageCount()));
});
}
}

其实在Web管理界面中,每个队列所在的页面下有一个Purge按钮,该按钮的功能就是清空队列消息。

r-m-d-2.png

交换器的相关操作

交换器的相关操作主要包括交换器的声明和删除。

交换器的声明

交换器的声明方法依赖于Channel的exchangeDeclare方法,按照惯例查看它重载方法中参数列表长度最大的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;

参数解释如下:

  • exchange:交换器名称。
  • type:type可以为字符串或者BuiltinExchangeType类型,BuiltinExchangeType枚举只包括DIRECT、FANOUT、TOPIC和HEADERS,而字符串类型除了定义四种内建类型的交换器,实际上RabbitMQ允许自定义类型的交换器,不过很少使用。
  • durable:是否开启持久化特性,如果此属性为true,则消息中间件代理重启后,交换器不会删除,实际上是会被重新声明一次。
  • autoDelete:是否自动删除,如果此属性为true,当最后一个绑定到此交换器的队列解除绑定关系,交换器会被删除。
  • internal:是否内部的,如果此属性为true,该交换器只允许消息中间件代理使用,客户端无法使用。
  • arguments:交换器参数,K-V结构,参数一般和消息中间件代理或者插件的一些扩展特性相关,不依赖这些扩展特性直接使用null即可。

举个简单的使用例子:

1
2
3
4
5
6
7
8
9
10
public class ExchangeDeclareMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDeclare("throwable.exchange.direct",
BuiltinExchangeType.DIRECT, true, false, null);

});
}
}

执行完毕之后,RabbitMQ的Web管理器的Exchanges的选项卡中就能看到对应的交换器:

r-m-d-3.png

因为没有声明交换器和队列的绑定,所以Bindings一栏是空的。

交换器的被动声明

交换器的被动声明类似于队列的被动声明,用于通过交换器名称检查是否存在对应的交换器,依赖于Channel的exchangeDeclarePassive方法:

1
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

举个例子:

1
2
3
4
5
6
7
8
9
10
11
public class ExchangeDeclarePassiveMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
//throwable.exchange.direct存在
channel.exchangeDeclarePassive("throwable.exchange.direct");
//throwable.exchange.fanout不存在,会抛出IOException
channel.exchangeDeclarePassive("throwable.exchange.fanout");
});
}
}

类似可以写个检查交换器是否存在的工具方法:

1
2
3
4
5
6
7
8
9
10
private boolean isExchangeExisted(Channel channel, String exchangeName) {
boolean flag = false;
try {
channel.exchangeDeclarePassive(exchangeName);
flag = true;
} catch (IOException e) {
//no-op
}
return flag;
}

交换器的删除

交换器的删除依赖于Channel的exchangeDelete方法,方法只依赖于交换器的名称。

1
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;

举个例子:

1
2
3
4
5
6
7
8
9
public class ExchangeDeleteMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDelete("throwable.exchange.direct");

});
}
}

绑定的声明

前面提到队列的声明和交换器的声明,队列和交换器创建之后,需要声明两者的绑定关系,Channel中提供了两种声明绑定关系的方法:

  • queueBind方法,声明队列和交换器的绑定关系。
  • exchangeBind方法,声明交换器和交换器之间的绑定关系。

同时也提供解除绑定的方法:

  • queueUnbind方法:解除队列和交换器的绑定关系。
  • exchangeUnbind方法:解除交换器之间的绑定关系。

队列和交换器的绑定和解绑

队列和交换器的绑定主要依赖于Channel的queueBind,而解绑主要依赖于queueUnbind方法,按照惯例看这两个方法重载方法中参数列表长度最大的方法:

1
2
3
4
5
6
7
8
9
Queue.BindOk queueBind(String queue, 
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;

Queue.UnbindOk queueUnbind(String queue,
String exchange,
String routingKey,
Map<String, Object> arguments) throws IOException;

注意这两个方法的参数列表完全一致:

  • queue:队列名称。
  • exchange:交换器名称。
  • routingKey:路由键。
  • arguments:绑定参数,K-V结构,参数一般和消息中间件代理或者插件的一些扩展特性相关,不依赖这些扩展特性直接使用null即可。

基于声明队列和交换器间的绑定举个例子:

1
2
3
4
5
6
7
8
9
public class QueueBindMain extends BaseChannelFactory{

public static void main(String[] args) throws Exception{
provideChannel(channel -> {
//throwable.exchange.direct->throwable.queue
channel.queueBind("throwable.queue","throwable.exchange.direct", "throwable.routingKey",null);
});
}
}

声明成功之后,可以查看对应的队列和交换器中的Bindings一栏:

r-m-d-4.png

r-m-d-5.png

可见交换器和队列成功建立了绑定关系。接着可以尝试使用解绑方法进行绑定解除:

1
2
3
4
5
6
7
8
public class QueueUnbindMain extends BaseChannelFactory{

public static void main(String[] args) throws Exception{
provideChannel(channel -> {
channel.queueUnbind("throwable.queue","throwable.exchange.direct", "throwable.routingKey",null);
});
}
}

交换器之间的绑定和解绑

RabbitMQ中支持两个不同的交换器之间进行绑定和解除绑定,绑定方法依赖于Channel的exchangeBind方法,解除绑定依赖于Channel的exchangeUnbind方法:

1
2
3
4
5
6
7
8
9
Exchange.BindOk exchangeBind(String destination, 
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;

Exchange.UnbindOk exchangeUnbind(String destination,
String source,
String routingKey,
Map<String, Object> arguments) throws IOException;

参数如下:

  • destination:目标交换器名称。
  • source:来源交换器名称。
  • routingKey:路由键。
  • arguments:参数,K-V结构。

我们先预先建立一个Fanout类型的交换器,命名为throwable.exchange.fanout,接着,我们把Fanout类型的交换器throwable.exchange.fanout作为来源交换器,绑定到Direct类型的目标交换器throwable.exchange.direct上:

1
2
3
4
5
6
7
8
9
public class ExchangeBindMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeDeclare("throwable.exchange.fanout", BuiltinExchangeType.FANOUT, true, false, null);
channel.exchangeBind("throwable.exchange.direct", "throwable.exchange.fanout", "exchange.routingKey");
});
}
}

现在可以查看交换器throwable.exchange.direct的Bindings一栏的信息:

r-m-d-6.png

这就是现在我们通过交换器throwable.exchange.fanout发送消息,消息会先到达交换器throwable.exchange.direct,然后再路由到队列throwable.queue中。多重绑定的交换器在一些复杂的场景有重要的作用,但是目前来看还没有碰到使用场景(一般来说,存在即合理)

接着举个例子进行交换器之间的绑定解除:

1
2
3
4
5
6
7
8
public class ExchangeUnbindMain extends BaseChannelFactory {

public static void main(String[] args) throws Exception {
provideChannel(channel -> {
channel.exchangeUnbind("throwable.exchange.direct", "throwable.exchange.fanout", "exchange.routingKey");
});
}
}

小结

一旦队列和交换器之间的绑定关系声明完毕,我们可以通过交换器和可选的路由键向队列中发送消息,可以注册消费者到队列中获取消息。RabbitMQ中的队列、交换器和绑定有个特点:组件的声明只承认第一次,也就是队列名、交换器名是唯一的,组件可以反复声明,不过声明所使用的参数必须和首次声明的参数一致

(本文完 c-3-d e-a-20181125)

前提

之前有个打算在学习RabbitMQ之前,把AMQP详细阅读一次,挑出里面的重点内容。后来找了下RabbitMQ的官方文档,发现了有一篇文档专门介绍了RabbitMQ中实现的AMQP模型部分,于是直接基于此文档和个人理解写下这篇文章。

AMQP协议

AMQP全称是Advanced Message Queuing Protocol,它是一个(分布式)消息传递协议,使用和符合此协议的客户端能够基于使用和符合此协议的消息传递中间件代理(Broker,也就是经纪人,个人感觉叫代理合口一些)进行通信。AMQP目前已经推出协议1.0,实现此协议的比较知名的产品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ实现的AMQP版本是0.9.1,官方文档中也提供了该协议pdf文本下载,有兴趣可以翻阅一下。

消息中间件代理的职责

Messaging Broker,这里称为消息中间件代理。它的职责是从发布者(Publisher,或者有些时候称为Producer,生产者)接收消息,然后把消息路由到消费者(Consumer,或者有些时候称为Listener,监听者)。

因为消息中间件代理、发布者客户端和消费者客户端都是基于AMQP这一网络消息协议,所以消息中间件代理、发布者客户端和消费者客户端可以在不同的机器上,从而实现分布式通讯和服务解耦。

消息中间件代理不仅仅提供了消息接收和消息路由这两个基本功能,还有其他高级的特性如消息持久化功能、监控功能等等。

AMQP-0-9-1在RabbitMQ中的基本模型

AMQP-0-9-1模型的基本视图是:消息发布者将消息发布到交换器(Exchange)中,交换器的角色有点类似于日常见到的邮局或者信箱。然后,交换器把消息的副本分发到队列(Queue)中,分发消息的时候遵循的规则叫做绑定(Binding)。接着,消息中间件代理向订阅队列的消费者发送消息(push模式),或者消费者也可以主动从队列中拉取消息(fetch/pull模式)。

r-a-m-1.png

发布者在发布消息的时候可以指定消息属性(消息元数据),某些消息元数据可能由消息中间件代理使用,其他消息元数据对于消息中间件代理而言是不透明的,仅供消息消费者使用。

由于网络是不可靠的,客户端可能无法接收消息或者处理消息失败,这个时候消息中间件代理无法感知消息是否正确传递到消费者中,因此AMQP模型提供了消息确认(Message Acknowledgement)的概念:当消息传递到消费者,消费者可以自动向消息中间件代理确认消息已经接收成功或者由应用程序开发者选择手动确认消息已经接收成功并且向消息中间件代理确认消息,消息中间件代理只有在接收到该消息(或者消息组)的确认通知后才会从队列中完全删除该消息。

在某些情况下,交换器无法正确路由到队列中,那么该消息就会返回给发布者,或者丢弃,或者如果消息中间件代理实现了"死信队列(Dead Letter Queue)"扩展,消息会被放置到死信队列中。消息发布者可以选择使用对应的参数控制路由失败的处理策略。

交换器和交换器类型

交互器(Exchange)是消息发送的第一站目的地,它的作用就是就收消息并且将其路由到零个或者多个队列。路由消息的算法取决于交互器的类型和路由规则(也就是Binding)。RabbitMQ消息中间件代理支持四种类型的交互器,分别是:

交换器类型 Broker默认预声明的交换器
Direct (空字符串[(AMQP default)])和amq.direct
Fanout amq.fanout
Topic amq.topic
Headers amq.match (和RabbitMQ中的amq.headers)

声明交互器的时候需要提供一些列的属性,其中比较重要的属性如下:

  • Name:交互器的名称。
  • Type:交换器的类型。
  • Durability:(交换器)持久化特性,如果启动此特性,则Broker重启后交换器依然存在,否则交换器会被删除。
  • Auto-delete:是否自动删除,如果启用此特性,当最后一个队列解除与交换器的绑定关系,交换器会被删除。
  • Arguments:可选参数,一般配合插件或者Broker的特性使用。

之所以存在Durability和Auto-delete特性是因为并发所有的场景和用例都要求交互器是持久化的。

Direct交换器

Direct类型的交换器基于消息路由键(RoutingKey)把消息传递到队列中。Direct交换器是消息单播路由的理想实现(当然,用于多播路由也可以),它的工作原理如下:

  • 队列使用路由键K绑定到交换器。
  • 当具有路由键R的新消息到达交换器的时候,如果K = R,那么交换器会把消息传递到队列中。

direct-exchange

默认交换器

默认交换器(Default Exchange)是一种特殊的Direct交互器,它的名称是空字符串(也就是""),它由消息中间件代理预声明,在RabbitMQ Broker中,它在Web管理界面中的名称是(AMQP default)。每个新创建的队列都会绑定到默认交换器,路由键就是该队列的队列名,也就是所有的队列都可以通过默认交换器进行消息投递,只需要指定路由键为相应的队列名即可。

default-exchange

Fanout交换器

Fanout其实是一个组合单词,fan也就是扇形,out就是向外发散的意思,Fanout交换器可以想象为"扇形"交换器。Fanout交换器会忽略路由键,它会路由消息到所有绑定到它的队列。也就是说,如果有N个队列绑定到一个Fanout交换器,当一个新的消息发布到该Fanout交换器,那么这条新消息的一个副本会分发到这N个队列中。Fanout交换器是消息广播路由的理想实现。

fanout-exchange

Topic交换器

Topic交换器基于路由键和绑定队列和交换器的模式进行匹配从而把消息路由到一个或者多个队列。绑定队列和交换器的Topic模式(这个模式串其实就是声明绑定时候的路由键,和消息发布的路由键并非同一个)一般使用点号(dot,也就是’.’)分隔,例如source.target.key,绑定模式支持通配符:

  • 符号#匹配一个或者多个词,例如:source.target.#可以匹配source.target.dogesource.target.doge.throwable等等。
  • 符号*只能匹配一个词,例如:source.target.*可以匹配source.target.dogesource.target.throwable等等。

对每一条消息,Topic交换器会遍历所有的绑定关系,检查消息指定的路由键是否匹配绑定关系中的路由键,如果匹配,则将消息推送到相应队列。

topic-exchange

Topic交换器是消息多播路由的理想实现。

Headers交换器

Headers交换器是一种不常用的交换器,它使用多个属性进行路由,这些属性一般称为消息头,它不使用路由键进行消息路由。消息头(Message Headers)是消息属性(消息元数据)部分,因此,使用Headers交换器在建立队列和交换器的绑定关系的时候需要指定一组键值对,发送消息到Headers交换器时候,需要在消息属性中携带一组键值对作为消息头。消息头属性支持匹配规则x-match如下:

  • x-match = all:表示所有的键值对都匹配才能接受到消息。
  • x-match = any:表示只要存在键值对匹配就能接受到消息。

Headers交换器也是忽略路由键的,只依赖于消息属性中的消息头进行消息路由。

headers-exchange

队列

AMQP 0-9-1模型中的队列与其他消息或者任务队列系统中的队列非常相似:它们存储应用程序所使用的消息。队列和交换器的基本属性有类似的地方:

  • Name:队列名称。
  • Durable:是否持久化,开启持久化意味着消息中间件代理重启后队列依然存在,否则队列会被删除。
  • Exclusive:是否独占的,开启队列独占特性意味着队列只能被一个连接使用并且连接关闭之后队列会被删除。
  • Auto-delete:是否自动删除,开启自动删除特性意味着队列至少有一个消费者并且最后一个消费者解除订阅状态(一般是消费者对应的通道关闭)后队列会自动删除。
  • Arguments:队列参数,一般和消息中间件代理或者插件的特性相关,如消息的过期时间(Message TTL)和队列长度等。

一个队列只有被声明(Declare)了才能使用,也就是队列的第一次声明就是队列的创建操作(因为第一次声明的时候队列并不存在)。如果使用相同的参数再次声明已经存在的队列,那么此次声明会不生效(当然也不会出现异常)。但是如果使用不相同的参数再次声明已经存在的队列,那么会抛出通道级别的异常,异常代码是406(PRECONDITION_FAILED)。

队列名称

队列名必须由255字节(bytes)长度以内的UTF-8编码字符组成。实现AMQP 0-9-1规范的消息中间件代理具备自动生成随机队列名的功能,也就是在声明队列的时候,队列名指定为空字符串,那么消息中间件代理会自动生成一个队列名,并且在队列声明的返回结果中带上对应的队列名。

以"amq."开头的队列是由消息中间件代理内部生成的,有其特殊的作用,因此不能声明此类名称的新队列,否则会导致通道级别的异常,异常代码为403(ACCESS_REFUSED)。

队列的持久化特性

持久化的队列会持久化到磁盘中,这种队列在消息中间件代理重启后不会被删除。不开启持久化特性的队列称为瞬时(transient)队列,并非所有的场景都需要开启队列的持久化特性。

队列的持久化特性并不意味着路由到它上面的消息是持久化的,也就是队列的持久化跟消息的持久化是两回事。如果息中间件代理挂了,它重启后会重新声明开启了持久化特性的队列,这些队列中只有使用了消息持久化特性的消息会被恢复。

绑定

绑定(Binding)是交换器路由消息到队列的规则。例如交换器E可以路由消息到队列Q,那么Q必须通过一定的规则绑定到E。绑定中使用的某些交换器的类型决定了它可以使用可选的路由键(RoutingKey)。路由键的作用类似于过滤器,可以筛选某些发布到交换器的消息路由到目标队列。

如果发布的消息没有路由到任意一个目标队列,例如,消息已经发布到交换器,交换器中没有任何绑定,这个时候消息会被丢弃或者返回给发布者,取决于消息发布者发布消息时候使用的参数。

消费者

如果队列只有发布者生产消息,那么是没有意义的,必须有消费者对消息进行使用,或者叫这个操作为消息消费,消息消费的方式有两种:

  • 消息代理中间件向消费者推送消息(推模式,代表方法是basic.consume)。
  • 消费者主动向消息代理中间件拉取消息(拉模式,代表方法是basic.get)。

使用推模式的情况下,消费者必须指定需要订阅的队列。每个队列可以存在多个消费者,或者仅仅注册一个独占的消费者。

每个消费者(订阅者)都有一个称为消费者标签(consumer tag)的标识符,消费者标签是一个字符串。通过消费者标签可以实现取消订阅的操作。

消息确认

消费者应用程序有可能在接收和处理消息的时候崩溃,也有可能因为网络原因导致消息中间件代理投递消息到消费者的时候失败了,这样就会催生一个问题:AMQP消息中间件代理应该在什么时候从队列中删除消息?因此,AMQP 0-9-1规范提供了两种选择:

  • 消息中间件代理向应用程序发送消息(使用AMQP方法basic.deliverbasic.get-ok)。
  • 应用程序收到消息后向消息中间件代理发送确认(使用AMQP方法basic.ack <= 个人感觉这个地方少写了basic.nackbasic.reject)

前一种称为自动确认模型(动作触发的同时进行了消息确认),后一种称为显式确认模型。显式确认模型中,需要消费者主动向消息中间件代理进行消息主动确认,这个消息主动确认动作的执行时机完全由应用程序控制。消息主动确认有三种方式:积极确认(ack)、消极确认(nack)和拒绝(reject)。

预取消息

预取消息(Prefetching Messages)是一个特性。对于多个消费者共享同一个队列的情况,能够告知消息中间件代理在发送下一个确认之前指定每个消费者一次可以接收消息的消息量。这个特性可以理解为简单的负载均衡技术,在批量发布消息的场景下能够提高吞吐量。

消息属性和有效负载

AMQP模型中,消息具有属性值。AMQP 0-9-1规范定义了一些常见的属性,一般开发人员不需要太关注这些属性:

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode (persistent or not)
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

这些通用的属性一般是消息中间件代理使用的,还有可以定制的可选属性header,形式是键值对,类似于HTTP中的请求头。消息属性是在发布消息的时候设置的。

AMQP消息还有一个有效载荷(payload,其实就是消息数据体),AMQP代理将其视为不透明的字节数组,也就是AMQP代理不会检查或者修改消息的有效载荷。有些消息可能只包含属性而没有有效负载。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)来序列化和结构化数据,以便将其作为消息有效负载发布。在一般约定下,消息属性中的Content typeContent encoding一般可以表明其序列化的方式。

消息发布支持消息的持久化特性,消息持久化特性开启后,消息中间件代理会把消息保存到磁盘中,如果重启代理消息也不会丢失。开启消息持久化特性将会影响性能,主要是因为涉及到刷盘操作。

AMQP-0-9-1方法

AMQP 0-9-1定义了一些方法,对应了客户端和消息中间件代理之间交互的一些操作方法,这些操作方法的设计跟面向对象编程语言中的方法没有任何共同之处。常用的交换器相关的操作方法有:

  • exchange.declare
  • exchange.declare-OK
  • exchange.delete
  • exchange.delete-OK

在逻辑上,上面几个操作方法在客户端和消息中间件代理之间的交互如下:

r-a-m-2.png

对于队列,也有类似的操作方法:

  • queue.declare
  • queue.declare-OK
  • queue.delete
  • queue.delete-OK

r-a-m-3.png

并非所有的AMQP操作方法都有响应结果操作方法,像消息发布方法basic.publish的使用是最广泛的,此操作方法没有对应的响应结果操作方法。有些操作方法可能有多个响应结果(操作方法),例如basic.get

连接(Connection)

AMQP的连接(Connection)通常是长期存在的。AMQP是一种使用TCP进行可靠传递的应用程序级协议。AMQP连接使用用户身份验证,可以使用TLS(SSL)进行保护。当应用程序不再需要连接到AMQP代理时,它应该正常关闭AMQP连接,而不是突然关闭底层TCP连接。

通道(Channel)

某些应用程序需要与AMQP代理程序建立多个连接。但是,不希望同时打开许多TCP连接,因为这样做会消耗系统资源并使配置防火墙变得十分困难。通道(Channel)可以认为是"共享一个单独的TCP连接的轻量级连接",一个AMQP连接可以拥有多个通道。

对于使用了多线程处理的应用程序,有一种使用场景十分普遍:每个线程开启一个新的通道使用,这些通道是线程间隔离的。

另外,每个特定的通道和其他通道是相互隔离的,每个执行的AMQP操作方法(包括响应)都携带一个通道的唯一标识,这样客户端就能通过该通道的唯一标识得知操作方法是对应哪个通道发生的。

虚拟主机(Virtual Host)

为了使单个消息中间件代理可以托管多个完全隔离的"环境"(这里的隔离指的是用户组、交互器、队列等),AMQP提供了虚拟主机(Virtual Host)的概念。多个虚拟主机类似于许多主流的Web服务器的虚拟主机,提供了AMQP组件完全隔离的环境。AMQP客户端可以在连接消息中间件代理时指定需要连接的虚拟主机。

个人理解

关于Exchange、Queue和Binding

理解RabbitMQ中的AMQP模型,其实从开发者的角度来看,最重要的是Exchange、Queue、Binding三者的关系,这里谈谈个人的见解。消息的发布第一站总是Exchange,从模型上看,消息发布无法直接发送到队列中。Exchange本身不存储消息,它在接收到消息之后,会基于路由规则也就是Binding,把消息路由到目标Queue中。从实际操作来看,声明路由规则总是在发布消息和消费消息之前,也就是一般步骤如下:

  • 1、声明Exchange。
  • 2、声明Queue。
  • 3、基于Exchange和Queue声明Binding,这个过程有可能自定义一个RoutingKey。
  • 4、通过Exchange消息发布,这个过程有可能使用到上一步定义的RoutingKey。
  • 5、通过Queue消费消息。

我们最关注的两个阶段,消息发布和消息消费中,消息发布实际上只跟Exchange有关,而消息消费实际上只跟Queue有关。Binding实际上就是Exchange和Queue的契约关系,会直接影响消息发布阶段的消息路由。那么,路由失败一般是什么情况导致的?路由失败,其实就是消息已经发布到Exchange,而Exchange中从既有的Binding中无法找到存在的目标Queue用于传递消息副本(一般而言,很少人会发送消息到一个不存在的Exchange)。消息路由失败,从理解AMQP的模型来看,可以从根本上避免的,除非是消息发布者故意胡乱使用或者人为错误使用了未存在的RoutingKey、Exchange或者说是Binding关系而导致的。

关于Exchange的类型

AMQP-0-9-1模型中支持了四种交换器direct(单播)、fanout(广播)、topic(多播)、headers,实际上,从使用者角度来看,四种交换器的功能是可以相互取代的。例如可以使用fanout类型交换器实现广播,其实使用direct类型交换器也是可以实现广播的,只是对应的direct类型交换器需要通过多个路由键绑定到多个目标队列中。在面对生产环境的技术选型的时候,我们需要考虑性能、维护难度、合理性等角度去考虑选择什么类型的交换器,就上面的广播消息的例子,显然使用fanout类型交换器可以避免声明多个绑定关系,这样在性能、合理性上是更优的选择。

关于负载均衡

在AMQP-0-9-1模型中,负载均衡的实现是基于消费者而不是基于队列(准确来说应该是消息传递到队列的方式)。实际上,出现消息生产速度大大超过消费者的消费速度的时候,队列中有可能会出现消息积压。AMQP-0-9-1模型中没有提供基于队列负载均衡的特性,也就是出现消息生产速度大大超过消费者的消费速度时候,并不会把消息路由到多个队列中,而是通过预取消息(Prefetching Messages)的特性,确定消息者的消费能力,从而调整消息中间件代理推送消息到对应消费者的数量,这样就能够实现消费速度快的消费者能够消费更多的消息,减少产生有消费者处于饥饿状态和有消费者长期处于忙碌状态的问题。

关于消息确认机制

AMQP中提供的消息确认机制主要包括积极确认(一般叫ack,Acknowledgement)、消极确认(一般叫nack,Negative Acknowledgement)和拒绝(reject)。消息确认机制是保证消息不丢失的重要措施,当消费者接收到消息中间件代理推送的消息时候,需要主动通知消息中间件代理消息已经确认投递成功,然后消息中间件代理才会从队列中删除对应的消息。没有主动确认的消息就会变为"nack"状态,可以想象为暂存在队列的"nack区"中,这些消息不会投递到消费者,直到消费者重启后,"nack区"中的消息会重新变为"ready"状态,可以重新投递给消费者。关于消息确认机制其实场景比较复杂,后面再做一篇文章专门分析。

小结

参考资料:

前提

工作接近3年,一直有使用RabbitMQ作为服务间解耦的中间件,但是一直没有做一系列学习和总结,这里决心做一个系列总结一下RabbitMQ的运维、使用以及生产中遇到的问题等,以便日后直接拿起来使用。整个系列使用的Linux系统为CentOS 7的最新版本CentOS-7-x86_64-Minimal-1804。而RabbitMQ Server使用当前最新的版本3.7.9.RELEASE。

RabbitMQ Server的安装

RabbitMQ Server使用Erlang语言编写,Erlang语言的并发编程支持比较优异,所以我们要先安装Erlang(类似于我们需要运行Java程序,要先安装JVM):

1
2
# 添加erlang的yum源
rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc

或者直接在目录/etc/yum.repos.d/手动添加一个新的.repo文件(文件名可以随意如erlang.repos),内容是:

1
2
3
4
5
6
[erlang-solutions]
name=CentOS $releasever - $basearch - Erlang Solutions
baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
gpgcheck=1
gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
enabled=1

然后执行命令安装Erlang:

1
2
# 安装erlang
sudo yum install erlang

安装完成之后Erlang会自行后台运行,输入erl就能进入Erlang的命令行工具说明安装成功:

r-s-1.png

安装Erlang过程中如果提示:

1
2
error: Failed dependencies:
epel-release is needed by erlang-solutions-1.0-1.noarch

说明缺少epel-release依赖,通过sudo yum install epel-release安装epel-release即可。

接着可以安装RabbitMQ Server,先下载其RPM安装包:

1
2
## 下载
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm

接着在下载文件目录中执行安装命令:

1
2
3
4
# 在Yum仓库可以使用之前,需要让RPM工具信任RabbitMQ的rpm包的签名,需要执行下面的命令
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
# Yum安装
yum install rabbitmq-server-3.7.9-1.el7.noarch.rpm

安装完成后,RabbitMQ Server会自行在后台运行,这个时候可以执行命令rabbitmqctl status验证其状态:

r-s-2.png

RabbitMQ Server启动于停止

RabbitMQ Server已经成功安装为CentOS 7的服务,它的启动和停止可以直接使用systemctl命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 启动
systemctl start rabbitmq-server

# 停止
systemctl stop rabbitmq-server

# 重启
systemctl restart rabbitmq-server

# 前台启动,shell关闭会shutdown
rabbitmq-server start

# 后台启动
rabbitmq-server -detached

当然,也可以使用RabbitMQ Serverrabbitmqctl命令(格式是:rabbitmqctl [-n <node>] [-l] [-q] <command> [<command options>]):

1
2
3
4
# 停止Erlang上的node节点
rabbitmqctl stop_app
# 启动
rabbitmqctl start_app

安装Web管理插件

RabbitMQ Server管理插件的命令是rabbitmq-plugins [-n <node>] [-l] [-q] <command> [<command options>],command部分目前只有下面几个:

1
2
3
4
5
6
Commands:
disable <plugin>|--all [--offline] [--online]
enable <plugin>|--all [--offline] [--online]
help <command>
list [pattern] [--verbose] [--minimal] [--enabled] [--implicitly-enabled]
set [<plugin>] [--offline] [--online]

我们可以通过命令rabbitmq-plugins list先展示所有可用的插件:

r-s-3.png

其中的rabbitmq_management就是我们需要安装的Web管理界面,执行下面的命令启用Web管理插件:

1
rabbitmq-plugins enable rabbitmq_management

r-s-4.png

实际上是启用了rabbitmq_managementrabbitmq_management_agentrabbitmq_web_dispatch三个插件。插件启动完毕后,我们需要添加一个新的用户或者修改原有的guest用户的权限,因为guest用户只允许使用localhost访问Web管理界面。

用户管理

用户账号密码管理常用命令如下:

1
2
3
4
5
6
7
8
9
10
# 新增一个用户
rabbitmqctl add_user ${username} ${password}
# 修改用户密码
rabbitmqctl change_password ${old_password} ${new_password}
# 验证用户密码
rabbitmqctl authenticate_user ${username} ${password}
# 删除用户
rabbitmqctl delete_user ${username}
# 展示用户列表
rabbitmqctl list_users

例如创建一个用户名和密码都是root的账号:rabbitmqctl add_user root root

用户角色(Tag)管理常用命令格式是rabbitmqctl set_user_tags ${username} ${tag...},可选的角色类型有:

Tag 描述
none 无角色,新创建的用户就是这类型的Tag
management 具备Web管理界面访问权限
policymaker 具备management所有权限,可以管理policy和parameter
monitoring 具备policymaker所有权限,可以监控连接、channel、节点信息等
administrator 管理员权限

例如为root账号赋予管理员权限:rabbitmqctl set_user_tags root administrator

用户权限(Permission)管理常用命令如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 添加权限
# vhost:虚拟host,默认为"/"
# username:用户名
# conf:配置权限,可以用正则表达式
# write:写权限,可以用正则表达式
# read:读权限,可以用正则表达式
rabbitmqctl set_permissions [-p vhost] ${username} ${conf} ${write} ${read}

# 查看虚拟host权限
rabbitmqctl list_permissions [-p vhost]

# 查看用户权限
rabbitmqctl list_user_permissions ${username}

# 清除用户权限
rabbitmqctl clear_permissions [-p vhost] ${username}

例如为root账号赋予虚拟host为"/"下的所有的配置、读、写权限:rabbitmqctl set_permissions root ".*" ".*" ".*"

我们可以使用root账号登录Web管理界面:

r-s-5.png

关于RabbitMQ Server多租户(虚拟Host)、角色、权限管理的其他细节暂时不展开,因为可能需要不少的篇幅才能说明。

小结

关于RabbitMQ Server的命令和运维方面的东西暂时不大量展开,按照上面几节搭建好的RabbitMQ服务对于测试或者开发调试已经基本可用,接着就可以通过官方提供的例子进行学习。

参考资料:

参考资料的链接在将来不确定是否有变,主要是参考了erlang和rabbitmq的官方文档的安装提示。

(本文完 c-1-d e-20181118)