消息中间件之RabbitMQ探索指南
前言
本文讲解和学习 RabbitMQ 服务的各种基本概念,使用方法和原理。
1. RabbitMQ
RabbitMQ 是实现了AMQP(高级消息队列协议) 的消息中间件。RabbitMQ服务最初由 Rabbit 公司开发,使用Erlang 语言编写,2007年发布并开源,2010年被 VMware 收购。

RabbitMQ 版本: 3.8.8 Release (2020-09月)
客户端 Java版本:5.9.0
为什么使用消息中间件
异步
解耦
削峰
1.1 模型架构
RabbitMQ 整体上是一个生产者与消费者模型,主要负责消息的发送、存储和接收。

2. AMQP协议
Advanced Message Queuing Protocol (AMQP) 高级消息队列协议是应用层协议,工作在TCP协议之上。愿景:成为所有消息中间件的标准协议。
官网:amqp.org
AMQP协议的版本时间线:
版本 | 时间 |
---|---|
AMQP 0.8 版 | 2006-06 |
AMQP 0.9.1版 | 2008-11-13 |
AMQP v1.0 final 版 | 2011-10-07 |
2012年10月,高级消息队列协议(AMQP) 成为OASIS国际标准。从时间线看,即便是最后一次发布的 v1.0 final 版本距现在也有很多年的时间了,所以AMQP 现在是一个很成熟和稳定的规范协议。RabbitMQ 服务可以看做是AMQP协议的实现者。
2.1 特点
RabbitMQ 的特性有很多,主要有如下:
特性 | 描述 |
---|---|
可靠性 | RabbitMQ 使用一些机制来保证可靠性,如持久化、发送确认和接收确认等。 |
灵活的路由 | 在消息进入队列之前,通过交换器来路由消息。 |
多种协议 | RabbitMQ 除了原生的支持AMQP协议外,还支持HTTP、STOMP、MQTT等多种协议。 |
多语言客户端 | RabbitMQ 支持几乎所有常用的开发语言,如Java、Python、PHP、C#、JavaScript等。 |
支持拓展插件 | 提供很多可插拔的插件安装。 |
Web管理界面 | RabbitMQ 提供了用户Web管理界面,可以监控和管理消息,查看集群节点等。 |
扩展性 | 多个RabbitMQ 节点可以组成一个集群。 |
高可用性 | 部分节点出现问题的情况下,队列仍然可用。 |
3. 基本概念
基本概念总览:
概念 | 描述 |
---|---|
生产者(Producer) | 消息的生产者,也就是发布消息的一方; |
消费者(Consumer) | 消息的消费者,也就是接受消息的一方; |
消息(Message) | 消息包含两部分,消息体和标签; |
服务节点(Broker) | 一个Broker是一个RabbitMQ服务 |
连接(Connection) | 表示一个TCP连接 |
通道(Channel) | 建立在TCP上的虚拟连接 |
虚拟主机(vhost) | 用于逻辑隔离 |
交换器(Exchange) | 路由消息到队列或交换器 |
队列(Queue) | 存储消息 |
绑定(Binding) | 交换器与队列,交换器与交换器之间需要绑定 |
路由键(RoutingKey) | 消息路由的关键字,字符串类型,单词之间用英文句号隔开 |
AMQP协议 | 高级消息队列协议 |
3.1 连接 Connection
一个Connection 表示一个TCP连接,操作系统创建和断开TCP连接的开销很大,所以一般情况下都会复用 Connection 。
使用Java客户端创建一个连接:
private static void connect() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
}
注:
- guest 用户(密码 guest)是RabbitMQ默认创建的超级管理员,开发中应该创建新的用户来连接MQ服务器。
- 一般情况下,会有多个客户端同时连接RabbitMQ服务器,应当为每一个客户端建立一个用户,以便区分每个客户端的连接情况。
- 虚拟主机是建立在连接上的概念,如果不指定,默认是”/“。
3.2 通道 Channel
一个连接(Connection)可以创建出多个通道实例(Channel),Channel 是客户端中存在的虚拟连接,多个Channel 共用一个连接,节省资源的同时最大化利用资源。

客户端创建一个Channel:
Channel channel = connection.createChannel();
无论生产者还是消费者,客户端与RabbitMQ服务器的几乎所有交互都是通过 Channel 对象进行操作的。RabbitMQ官方提供的Java客户端的包名为 com.rabbitmq.client
,Channel 接口定义了客户端的所有基本操作,所以想要全面了解Channel 的功能,Channel接口应该重点了解。
创建完Channel ,就可以声明交换器和队列,进行绑定,发送或接收消息了。
:warning: 特别注意:
通道 Channel 并不是并发安全的实例,因此不能多线程同时操作。应该每一个线程使用一个Channel,而不是多线程之间共用一个Channel。
3.3 交换器(Exchange)
交换器、队列和绑定是RabbitMQ的三大组件。
交换器用来路由消息,RabbitMQ中将消息路由到队列的组件。
创建交换器
创建交换器:
//创建一个非持久化、非自动删除的 direct 类型的交换器
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
//创建一个持久化、自动删除、公开的 topic 类型的交换器
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,true,false,null);
exchangeDeclare 方法有多个重载,它们最终都会调用参数最全的一个:
public interface Channel extends AutoCloseable {
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
}
参数如下:
属性 | 描述 |
---|---|
exchange | 交换器名称 |
type | 类型 |
durable | 是否持久化 |
autoDelete | 是否自动删除 |
internal | 是否内部交换器 |
arguments | 参数 |
详解:
type:交换器共有四种类型:direct、topic 、fanout 、headers ;
durable:是否持久化,持久化可以将交换器存储到磁盘,如果交换器没有被删除的情况下,服务器重启后不会丢失相关信息。
autoDelete:设置为 true 表示自动删除,如果该交换器至少被一个队列或交换器绑定过,并且之后所有绑定的队列或交换器都与之解绑,那么该交换器将自动删除。这个地方容易错误理解,如果该交换器从没有被绑定过,那么不会自动删除。
internal :设置是否为内置交换器,如果是内置交换器,则生产者不能直接将消息发送到内置交换器,只能通过交换器路由到交换器这一种方式。
arguments:key-value 形式的参数,如 alternate-exchange等。
四种交换器类型
交换器类型有四种:
- direct 直接路由
- topic 主题路由
- fanout 广播路由
- headers 头部信息路由
direct 直接路由
路由规则:
根据Routing Key = Bounding Key 的规则路由消息。
使用场景:
区分发往同一个交换器的消息。

topic 主题路由
路由规则:
根据通配符匹配路由消息。
- 星号
*
代办零个或一个单词。例如:com - 井号
#
代办零个、一个或多个单词,单词之间通过英文句号.
隔开。例如:amqp.org.doc
使用场景:
通过主题对消息进行区分。

使用场景:
fanout 广播路由
路由规则:
fanout Exchange 无视Bonding Key 的匹配规则,将消息发送到与该交换器绑定的所有队列中。
使用场景:
广播消息。

headers 头部信息路由
headers 类型的交换器无视路由键的规则,在发送消息时,BasicProperties 有一个 headers 属性,用于指定key-value 形式的头部信息。交换器和队里进行绑定时,也会有一个key-value 形式的绑定参数,如果交换器是 headers 类型,那么这两组键值对完全匹配时才能路由成功。
//发送消息的头部参数
HashMap<String, Object> args = new HashMap<>();
args.put("userid",10);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.headers(args);
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"com.rabbitmq.",true,basicProperties,message.getBytes());
//绑定参数
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
注:
headers 类型的交换器效率低,而且不实用,很少会使用它。
删除交换器
//删除交换器
channel.exchangeDelete(exchangeName);
3.4 队列(Queue)
RabbitMQ中存储消息的缓存区,先进先出FIFO的数据结构。
RabbitMQ中的消息只能存入队列中,队列必须绑定交换器。
多个消费者可以连接同一个队列,队列中消息被多个消费者轮询消费,但队列中的消息只会被消费一次。
队列只能和交换器进行绑定,而交换器既可以和队列绑定,也可以和交换器绑定。内部交换器只能与交换器进行绑定。
声明队列
Channel 声明队列的方式:
public interface Channel extends AutoCloseable {
//声明队列:MQ服务器自动命名,自动删除,非持久化的队列
Queue.DeclareOk queueDeclare() throws IOException;
//声明队列:指定队列名称,是否持久化,是否排他,是否自动删除,参数
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
//异步声明队列:不保证队列声明成功;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
//查询队列是否存在,如果不存在则抛出 IOException 异常,并关闭 channel
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
}
参数含义:
参数 | 描述 |
---|---|
queue | 队列名称 |
durable | 是否持久化,存入磁盘,服务重启数据不会丢失。 |
exclusive | 是否排他,如果设置为排他队列,那么该队列仅对首次声明它的连接可见,并且当连接断开时,队列自动删除。排他队列适用于一个客户端同时发送和读取消息的场景。 |
autoDelete | 是否自动删除,设置为true表示自动删除,自动删除的前提是之前至少有一个消费者连接到这个队列,并且之后所有的连接都断开时,队列自动删除。如果从没有消费者连接过这个队列,队列是不会自动删除的。 |
arguments | 设置队列的其他参数(键值对形式)。 |
常用参数
# 设置队列中消息最大条数
x-max-length
# 设置队列最大容量(单位:字节)
x-max-length-bytes
# 设置队列中消息的过期时间(单位:毫秒)
x-message-ttl
# 设置队列的过期时间(单位:毫秒)
x-expires
# 设置队列的最大优先级
x-max-priority
# 设置队列的死信交换器
x-dead-letter-exchange
# 设置死信交换器的路由键
x-dead-letter-routing-key
每个参数都对应着相应的功能,下面会分别详细讲解。
清空队列
使用 channel.queuePurge 清空队列中的所有消息,队列本身不删除。
channel.queuePurge(queueName);
删除队列
//删除队列
channel.queueDelete(queueName);
3.5 绑定(Binding)
交换器与队列的关联规则,通过 Binding Key 将交换器与队列或交换器与交换器进行绑定。两个组件之间的绑定关系可以有多个。
API:
交换器和队列的绑定和解绑:
//绑定交换器和队列
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
//解绑队列和交换器
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
交换器和交换器的绑定和解绑:
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
发送消息的路由键、交换器类型、绑定键共同决定了消息被路由到哪个队列。
3.6 发送消息
通过 channel.basicPublish
将消息发送到交换器:
API:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.deliveryMode(2).//持久化消息
priority(6); //设置消息优先级
AMQP.BasicProperties basicProperties = builder.build();
//发送消息
channel.basicPublish(orderExchange,routingKey,true,basicProperties,message);
mandatory 参数:
默认情况下,在发送消息时如果交换器根据自身的类型和路由键无法匹配到队列,消息将丢弃。设置 mandatory 参数为true,RabbitMQ将会通过 Basic.Return 命令把消息返还给生产者。
immediate 参数:
如果immediate 参数设置为true,消息被路由到队列后,发现队列上并不存在任何消费者,那么这条消息不会存入当前队列;如果路由到的所有队列都没有消费者时,MQ服务器会通过Basic.Return 命令把消息返还给生产者。
channel.basicPublish(orderExchange,routing,true,basicProperties,message);
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息投递失败,消息返回: "+message);
}
});
mandatory 和 immediate 参数有一个设置为true时,就可以增加返回监听了。如果都设置为true,那么返回的消息有可能是交换器匹配不到队列,又或者是所有匹配的队列上没有消费者产生的。如果两个参数都不设置或都为false,则没有必要增加返回监听。
注:
immediate 参数不建议去设置使用,首先会降低效率,其次会造成多种结果,如果消息被路由到两个队列,一个队列上有消费者,一个没有消费者,那么该消息不会存入无消费者的队列,而且消息不会返还给生产者。如果想实现immediate 参数相关的功能,可以使用设置过期时间来代替。
3.7 接收消息
消费者有两种方式来接收消息,自动推送和手动获取,也叫推模式和拉模式。
- 推模式
- 拉模式
推模式即RabbitMQ服务器源源不断的自动推送消息给消费者客户端,而拉模式需要消费者手动的一个个拉取消息。
推模式
消费者使用 channel.basicConsume
方法实现消息的自动推送。
API:
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
queue:
队列名称
autoAck:
消息是否自动确认,默认是true,即自动确认。设置为false改为手动确认,可以使系统更加健壮。
callback:
Consumer 接口类型的回调,消费者订阅队列后,通过该接口接收通知消息。应该避免回调的接口方法执行时间过长,因为这将延迟将消息分发到同一个频道上的其它使用者。
DefaultConsumer 是该接口的默认空实现,消费者可以通过将DefaultConsumer 子类化实现自己的逻辑。
使用:
boolean autoACK = false;
channel.basicQos(100);
channel.basicConsume(queueName,autoACK,new MyConsumer(channel));
class MyConsumer extends DefaultConsumer{
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//接收消息
String message = new String(body);
//处理消息
handleMessage(message);
//确认消息
getChannel().basicAck(envelope.getDeliveryTag(),false);
}
}
Qos 参数
采用推模式获取消息,消息会自动推送到消费者客户端,推模式受 Qos 参数的影响,即未确认的消息上限。当未确认的消息数量达到设置的上限后,服务器将不再推送消息给客户端,直到未确认的消息数量低于上限。未确认消息的阀值范围是0到 65535,设置 Qos 选项非常有用,它可以达到很好的限流作用,正常情况下,作为消费者的业务服务器接收MQ中推送的消息,然后处理并确认,如果消息产生的速度大于消费的速度,那么消费者未处理的消息和任务会越来越多,有可能导致消费者出现异常。
设置Qos 阀值后,消费者未确认的消息达到阀值,说明消费者没有来的及处理这么多消息,消息暂且存在MQ中,起到了很好的限流和削峰作用。
拉模式
消费者使用 channel.basicGet
方法手动拉取消息。
API:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
使用:
//拉取消息
GetResponse mqResponse = channel.basicGet(queueName, false);
//获取消息体
byte[] body = mqResponse.getBody();
注:
拉模式不受 Qos 参数的影响,只要队列中有未确认的消息,就能拉取成功。
确认消息
消费者接收到消息,默认是自动确认,即接收到消息立即自动确认,如果消息没有被消费者正常处理,那么该消息也算丢失了。在获取消息时可以将自动确认关闭,处理完消息后再手动确认消息接受成功。
API:
//消息确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag :
deliveryTag 是消息标识,代表一条消息。
multiple:
false 表示只确认deliveryTag 这一条消息,true 表示确认 deliveryTag 编号及之前的所有未确认的消息。
拒绝消息
消费者接收到消息,也有可能进行拒绝,使用 Basic.Reject
拒绝单条消息 或 Basic.Nack
拒绝多条消息。
API:
//单条拒绝
void basicReject(long deliveryTag, boolean requeue) throws IOException;
//批量拒绝
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
deliveryTag :
deliveryTag 是消息标识,代表一条消息,类型是64位长整型,从1开始递增。
requeue :
拒绝后是否重新入队,true表示重新存入队列,false表示从队列移除该消息。
multiple:
用于拒绝多个消息时使用,true表示拒绝 deliveryTag 编号及之前的所有未被确认的消息,false 则仅仅拒绝deliveryTag 这一条消息,和 basicReject 方法相同。
使用:
//单条拒绝
channel.basicReject(deliveryTag,false);
//批量拒绝
channel.basicNack(deliveryTag,true,false);
注:
消息被拒绝后,如果不重新入队,不额外处理,消息将会丢弃。可以通过设置死信交换器/队列来收集消费者拒绝的消息。
3.8 虚拟主机 vhost
AMQ中的虚拟主机用于逻辑隔离。每一个vhost 相当于一个独立的小型RabbitMQ服务器,拥有自己独立的交换器、队列以及绑定关系,甚至是通道和连接,vhost 之间是绝对隔离的,不同 vhost 之间的交换器和队列是无法进行绑定的。
RabbitMQ默认创建的vhost 为“/”,如果业务上不需要逻辑隔离,那么使用默认的vhost 也是合理的。但当业务达到一定规模的时候,建议按业务场景归类区分,进行隔离,这样不仅可以保证安全,还便于迁移。
4. 进阶开发
4.1 事务机制
事务就是一组任务要么全部成功,要么全部失败,不允许存在中间状态。RabbitMQ中的事务与数据库中的事务不一样,RabbitMQ中的事务非常简单,只提供一种消息传输的可靠性。
RabbitMQ与事务相关的方法有3个:
channel.txSelect(); //通道开启事务模式
channel.txCommit(); //提交事务
channel.txRollback(); //回滚事务
用法:
try {
channel.txSelect();
channel.basicPublish("exchangeName","routingKey",true,basicProperties,message);
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
channel.txRollback();
}
如果是循环发送消息,只需要将 channel.basicPublish 和 channel.txCommit、txRollback方法包裹在循环体内,
事务机制在发送一条消息之后会使生产者阻塞,直到Broker给出回应,才能继续进行下一步操作。使用事务发送消息的效率还是比较低的,降低Broker的消息吞吐量。
由此RabbitMQ提供了另一种消息传输保障-发送确认机制(publisher confirm)。
4.2 发送确认机制
消息首先会被发送到服务器的交换器,如果在这一环节发生异常,如交换器不存在,RabbitMQ提供了发送确认机制(publisher confirm)保障数据传输的可靠性。
同步确认
使用 channel.waitForConfirms() 同步等待MQ服务器的确认应答:
//通道开启publisher confirm模式
channel.confirmSelect();
channel.basicPublish("exchangeName","routingKey",true,basicProperties,message.getBytes());
if (!channel.waitForConfirms()){
//消息发送失败后的处理...
}
waitForConfirms() 方法是阻塞方法,
boolean waitForConfirms()
boolean waitForConfirms(long timeout)
注:
通道只有开启 confirm 模式,才能调用waitForConfirms() 方法,否则会报错。
异步确认
使用 waitForConfirms() 同步确认方式和使用事务的方式在效率方面区别不是很大,消息吞吐量低,效率都一般。
//通道开启publisher confirm模式
channel.confirmSelect();
channel.basicPublish("exchangeName","routingKey",true,basicProperties,message.getBytes());
//服务器的确认应答监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//Broker回传ACK命令,表示消息已经到达交换器
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//Broker回传Nack命令
}
});
总结:
事务机制和 publisher confirm 机制两者是互斥的,如果试图在已开启事务模式的信道上再设置 publisher confirm 模式,则会报错。反过来说,如果试图在已开启 publisher confirm 模式的信道上再开启事务模式,同样会报错。
事务机制和publisher confirm 机制只是保证消息可以发送到服务器的交换器,如果交换器没有绑定的队列或没有匹配的队列,那么默认情况下这个消息会被丢失,所以此时还需要另一种机制来进一步提供保障,那就是设置 mandatory 参数或备份交换器。
4.3 mandatory 参数
前面提到,事务机制和发送确认机制,只是保证消息被传送到目标交换器,而然当交换器没有绑定的队列或根据类型和路由键匹配不到队列时,消息也会丢失。
客户端在发送消息时可以通过设置mandatory 参数为true,让RabbitMQ服务器返回给客户端应答来保证消息成功被路由:
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
用法:
//设置 mandatory 参数为true
channel.basicPublish(orderExchange,routing,true,basicProperties,message.getBytes());
//增加消息返回监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消息投递失败,消息返回: "+message);
}
});
4.4 备份交换器
如果生产者发送消息时没有加入mandatory 参数,那么消息在未被路由的情况下会丢失。设置 mandatory 参数为true,同时客户端也要增加返回监听才有意义。除了设置 mandatory 参数这个方法外,RabbitMQ还提供了另一种方式保障未路由的消息不丢失,那就是设置备份交换器。
在创建交换器时,通过设置 alternate-exchange 参数指定一个备份交换器。
//声明备份交换器
channel.exchangeDeclare("order-exchange-backup", BuiltinExchangeType.FANOUT,true,false,null);
//声明备份交换器的关联队列
channel.queueDeclare("order-queue-backup", true, false, false, null);
//绑定
channel.queueBind("order-queue-backup","order-exchange-backup","");
//声明业务交换器,通过 alternate-exchange 参数关联备份交换器
HashMap<String, Object> arg = new HashMap<>();
arg.put("alternate-exchange","order-exchange-backup");
channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,arg);
//声明队列
channel.queueDeclare(orderQueue, true, false, false, null);
//绑定
channel.queueBind(orderQueue,orderExchange,"normalKey");
如果不做额外处理,交换器不能通过消息的routingKey 匹配到目标队列,则消息丢失。如果设置备份交换器和队列,则消息存储在备份交换器绑定的队列中。
特别注意,到达备份交换器的路由键就是原来的路由键。备份交换器与备份队列之间一定要有效绑定,否则未路由的消息还是会丢失。

使用备份交换器也有特殊情况:
- 如果备份交换器不存在,客户端和服务器都不会报错,消息丢失。
- 如果备份交换器没有绑定任何队列,客户端和服务器都不会报错,消息丢失。
- 如果备份交换器没有匹配到任何队列,客户端和服务器都不会报错,消息丢失。
- 备份交换器和mandatory 参数一起使用,那么mandatory 参数无效;
小结:
通过 发送消息加入mandatory 参数和使用备份交换器可以有效提高系统的健壮性。
4.5 持久化机制
持久化即将数据写入磁盘,服务重启后数据不会丢失,系统自动创建持久化的数据。
RabbitMQ内部的持久化分为三部分:
- 交换器持久化
- 队列持久化
- 消息持久化
交换器持久化
交换器的持久化在声明创建时即可指定,第三个 boolean 类型的参数 durable 设置为true 表示持久化该交换器。
Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
声明一个持久化的交换器:
channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,false,null);
队列持久化
队列的持久化在声明创建时即可指定,第二个 boolean 类型的参数 durable 设置为true 表示持久化该队列。
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
声明一个持久化的队列:
channel.queueDeclare(orderQueue, true, false, false, null);
消息持久化
消息的持久化在发布消息时指定,通过 BasicProperties 设置持久化属性。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.deliveryMode(2); //设置持久化消息
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"routingKey",true,basicProperties,message);
特别注意:
要想保证持久化的消息不丢失,则消息相关联的队列与交换器的持久化都要打开,否则消息还是不会持久化。消息存储在队列中,队列如果不持久化,只持久化消息也是没意义的。
4.6 接收确认机制
无论是推送消息还是拉取消息,默认情况下是自动确认的,即消息到达消费者手中后立即确认收到消息,当消费者还没来得处理就重启或宕机的这种情况下,这条消息虽然自动确认了,但是也相当于丢失了。
面对这种情况,RabbitMQ 提供给消费者手动确认机制(Channel 接口提供)。
//手动确认消息
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag :
deliveryTag 是消息标识,代表一条消息。
multiple:
false 表示只确认deliveryTag 这一条消息,true 表示确认 deliveryTag 编号及之前的所有未确认的消息。
4.7 如何保障消息可靠性
业务系统接入消息中间件首先要考虑消息传输的可靠性,消息传输无非分三种情况:
- 最少一次:消息不会丢失,但可能会重复传输。
- 最多一次:消息可能会丢失,但不会重复传输。
- 恰好一次:每条消息只被传输一次。
总结下有四点:
- 事务机制或发送确认机制;
- 设置 mandatory 参数或备份交换器;
- 交换器、队列、消息可以设置为持久化;
- 消费者手动确认消息被正常消费;
4.8 设置过期时间 TTL
RabbitMQ支持给消息和队列设置过期时间。
设置消息的TTL
有两种方式可以设置消息的过期时间,第一种是在声明队列的参数(x-message-ttl)中设置,第二种是在发送消息时单独设置。
//方式一: 声明队列时设置过期时间
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-message-ttl",10000);
channel.queueDeclare(orderQueue, true, false, false, arg);
//方式二: 发送消息时单独设置过期时间
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
basicProperties.builder()
.deliveryMode(2) //持久化消息
.expiration("60000"); //过期时间为60秒
channel.basicPublish(orderExchange,routing,true,basicProperties,message.getBytes());
设置队列的TTL
在声明队列时通过 x-expires 参数设置队列的TTL。
//设置队列的过期时间
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-expires",60000);
channel.queueDeclare(orderQueue, true, false, false, arg);
4.9 死信交换器/队列
当消息在一个队列中变为死信(dead message)时,它就被重新发送到另一个交换器–死信交换器DLX(dead letter exchange),绑定死信交换器的队列称之为死信队列。
消息变为死信有如下几种情况:
- 消息被消费者拒绝
- 消息过期
- 队列达到最大长度/最大容量
死信交换器和其他交换器没有区别,都能够和任意交换器或队列进行绑定。
使用
死信交换器的使用方式和备份交换器差不多:
//声明死信交换器
channel.exchangeDeclare("x.dlx", BuiltinExchangeType.DIRECT,true,false,null);
//声明死信交换器的关联队列
channel.queueDeclare("q.dlx", true, false, false, null);
//绑定
channel.queueBind("x.dlx","q.dlx","dlx.order");
//声明业务交换器,通过参数设置死信交换器
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange","x.dlx");
arg.put("x-dead-letter-routing-key","dlx.order");
arg.put("x-message-ttl",60000);
channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,arg);
//声明队列
channel.queueDeclare(orderQueue, true, false, false, null);
//绑定
channel.queueBind(orderQueue,orderExchange,"normalKey");
死信交换器/队列的工作示意图:

死信交换器和备份交换器
死信交换器和备份交换器的工作机制差不多,都属于其他交换器或队列的“备胎”,除此之外,与其他交换器并没有什么不同。死信交换器和备份交换器主要有两点不同:
- 作用场景不一样。备份交换器在原交换器匹配不到队列时发挥作用,死信交换器在队列中的消息被丢弃时发挥作用;
- 死信交换器可以指定路由键,备份交换器的路由键不能指定,默认只能是原交换器的路由键;
4.10 延迟队列
延迟队列存储的是延迟消息,所谓延迟消息,是指生产者发送消息后,并不想让消费者立刻拿到消息,而是等待特定的时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 并没有直接提供延迟队列功能,通过死信交换器 DLX + 设置队列过期时间 TTL 可以实现延迟队列,上面的例子就是延迟队列的实现。
4.11 优先级队列
在优先级队列中,优先级高的消息最先被消费。(前提:队列中积累有一定量的消息。)
通过设置队列的 x-max-priority 参数来指定队列的最大优先级:
HashMap<String, Object> args = new HashMap<>();
args.put("x-max-priority",10);
channel.queueDeclare(orderQueue, true, false, false, args);
然后在发送消息时,指定消息的优先级:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.priority(6);
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"com.rabbitmq",true,basicProperties,message);
注:
只有当生产消息的速度大于消费消息的速度,队列中积累有一定量的消息时,使用优先级队列才有意义,否则队列中只有一个或少量的消息,设置优先级也就失去了意义。
4.12 惰性队列
RabbitMQ 3.6.0 开始引入惰性队列,惰性队列会尽可能的将消息存入磁盘,而在消费者消费到相应的信息时才会被加载到内存中。它的一个重要的设计目标就是为了支持更长的队列,即存储更多的消息。
如果消息的消费能力大于生产能力,那么队列中不会积压太多消息,惰性队列也就没什么用场。惰性队列适用于消费速度小于生产速度,队列中可能会堆积大量消息的业务场景。
RabbitMQ的队列具有两种模式:default 和 lazy,默认为 default 模式,lazy 即为惰性队列模式。
//声明队列时设置为惰性队列
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-queue-mode","lazy");
channel.queueDeclare(orderQueue, true, false, false, arg);
惰性队列和普通队列相比,存储同样数量和大小的消息,惰性队列只有很小的内存开销,无论队列和消息是否设置为持久化,惰性队列首先会将消息写入磁盘,这会增加I/O开销,对于持久化的消息来说, I/O开销这是不可避免的,所以消息持久化和惰性队列是一组好拍档;而对于非持久化的消息来说,惰性队列牺牲了I/O操作的开销,节省了内存占用的开销,重启后消息一样会丢失。
5. RabbitMQ 服务器运维
5.1 安装启动
安装RabbitMQ之前要事先安装 Erlang 环境, 不同的RabbitMQ 服务版本对应的Erlang 环境版本也不一样,安装前在官网确定对应的版本, RabbitMQ 3.8.7 Release 版本需要的 Erlang/OTP 版本为 21.3 ~ 23.x 。
yum install erlang
安装完 Erlang 环境和 RabbitMQ 服务后,系统会自动注册几个关于 RabbitMQ的服务,如 rabbitmqctl、rabbitmq-server、rabbitmq-plugins 等,直接运行 rabbitmq-server 就可以启动 RabbitMQ 服务。
# 前台方式启动RabbitMQ 服务
rabbitmq-server
## ## RabbitMQ 3.8.3
## ##
########## Copyright (c) 2007-2020 Pivotal Software, Inc.
###### ##
########## Licensed under the MPL 1.1. Website: https://rabbitmq.com
Doc guides: https://rabbitmq.com/documentation.html
Support: https://rabbitmq.com/contact.html
Tutorials: https://rabbitmq.com/getstarted.html
Monitoring: https://rabbitmq.com/monitoring.html
Logs: /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
/var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log
Config file(s): /etc/rabbitmq/rabbitmq.conf
Starting broker... completed with 3 plugins.
前台方式启动RabbitMQ 会打印 服务版本信息,日志文件、配置文件位置等信息。前台方式启动的服务当前窗口不能关闭,否则服务停止。后台方式启动可以使用 systemctl :
# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server
我们还可以使用 rabbitmqctl 工具启动和停止RabbitMQ服务:
# 启动服务
rabbitmqctl start_app
# 查看服务状态
rabbitmqctl status
# 停止服务
rabbitmqctl stop_app
5.2 配置文件
RabbitMQ 的配置文件位于 /etc/rabbitmq/
目录下,可以通过 rabbitmqctl status
查看配置文件的所在位置,默认是 /etc/rabbitmq/rabbitmq.conf ,如果 /etc/rabbitmq/ 目录下面没有该配置文件,手动新建一个 rabbitmq.conf 即可,RabbitMQ 启动时会自动加载该配置文件。
RabbitMQ 3.7 版本之前的配置文件的语法继承于erlang语言,有点类似json
[
%% this is rabbitmq config file
{rabbit, [
{vm_memory_high_watermark_paging_ratio, 0.4},
{vm_memory_high_watermark, 0.4}
]
}
].
语法:
- 注释使用%%
- 最外层一定要使用 [ ]. 注意不要省略英文句号,否则启动报错。
RabbitMQ 3.7 版本及之后,配置文件的语法发生变化,语法更加简单和清爽,符合主流配置文件格式:
# this is rabbitmq config file
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.5
官方在 GitHub上给出了一份配置文件的demo,可以参考使用。
部分内容如下:
# 配置监听端口
listeners.tcp.default = 5672
listeners.ssl.default = 5671
# 接受TCP连接的Erlang进程数
num_acceptors.tcp = 10
num_acceptors.ssl = 10
# 设置每个连接允许的最大通道数, 0表示不限制
channel_max = 128
# 内存使用限制
# 配置内存阀值:(相对方式) 40%
vm_memory_high_watermark.relative = 0.4
# 配置内存阀值:(绝对方式) 2GB (1k = 1024 bytes, 1kB = 1000 bytes)
vm_memory_high_watermark.absolute = 2GB
# 内存换页的阀值
vm_memory_high_watermark_paging_ratio = 0.5
# 磁盘使用限制:相对于RAM总量设置
disk_free_limit.relative = 3.0
# 磁盘使用限制:绝对值
disk_free_limit.absolute = 2GB
# HTTP 监听设置
management.tcp.port = 15672
management.tcp.ip = 0.0.0.0
management.http_log_dir = /path/to/access.log
# HTTPS 监听设置
management.ssl.port = 15671
management.ssl.cacertfile = /path/to/ca_certificate.pem
management.ssl.certfile = /path/to/server_certificate.pem
management.ssl.keyfile = /path/to/server_key.pem
enabled_plugins
设置允许的插件列表,配置文件的格式为 [rabbitmq_management,rabbitmq_management_agent,rabbitmq_web_dispatch].
rabbitmq.conf
设置RabbitMQ的运行时参数。rabbitmq 的配置参数大多都有默认值,很多默认值不需要用户修改就可以很好的工作。rabbitmq.conf 配置文件设置的都是 key-value 形式的参数,这些设置会合并到默认值中。
变量 | 描述 |
---|---|
listeners | 监听AMQP协议的TCP普通端口号,默认值:5672 |
num_acceptors.tcp | 接收TCP监听的Erlang 进程数,默认值:10 |
handshake_timeout | TCP握手的最长时间,默认值:10000,即10秒 |
listeners.ssl | ssl端口号,默认不设置。如有需要可以设置为 5671 |
num_acceptors.ssl | 接收TCP(SSL)监听的Erlang 进程数,默认值:10 |
ssl_handshake_timeout | TCP(SSL)握手的最长时间,默认值:5000,即5秒 |
vm_memory_high_watermark | 内存上限阀值,默认值:0.4,即OS内存可用量的40%; 该值有两种设置方式,方式一是相对于RAM设置比值:vm_memory_high_watermark.relative = 0.6 也可以设置绝对值:vm_memory_high_watermark.absolute = 2GB |
vm_memory_high_watermark_paging_ratio | 开始将内存分页到磁盘的阀值比例,默认值:0.5; |
disk_free_limit | 剩余磁盘空间的阀值,默认值:50M; 该值有两种设置方式,方式一是相对于RAM总量设置:disk_free_limit.relative = 2.0 也可以设置绝对值:disk_free_limit.absolute = 2GB |
log.file.level | 日志打印级别,默认值:info;级别可以是 error、warning、info,debug。 error 只打印错误信息,warning 打印错误和警告信息,info 打印错误、警告和参考信息, debug打印 打印错误、警告、参考信息、调试信息。 |
channel_max | 与客户端协商的使用通道的最大数量,默认值: 2047;设置为0表示不限制, 一般不要这么做,通道越多,占用Broker的内存越多。 |
channel_operation_timeout | 通道操作超时时间(单位:毫秒),默认值:15000,即15秒。 |
max_message_size | 允许发送消息的最大大小(单位:字节)。 默认值:134217728,即128M;最大值为 536870912,即512M; |
heartbeat | 心跳,默认值:60; |
default_vhost | 默认的虚拟主机,默认值:/ |
注:可以参考官方给出的配置文件demo: rabbitmq.conf demo
advanced.config
如果存在 advanced.config 则会自动加载此配置文件,advanced.config 中的内容使用原来的 Erlang 风格书写,配置的都是比key-value 形式稍微复杂的数据格式,如数组,对象等。
advanced.config 中的内容同样会合并到默认值中。advanced.config 和 rabbitmq.conf 都是配置文件,两者以不同的语法各自配置不同的参数,没有主次之分,也不应该发生冲突。
注:可以参考官方给出的配置文件demo: advanced.config demo
rabbitmq-env.conf
设置RabbitMQ的环境变量参数:
RABBITMQ_NODE_PORT 端口设置,默认为5672
RABBITMQ_NODENAME 节点名称,默认为rabbit
MNESIA_BASE 后端存储目录,默认为/var/lib/rabbitmq/mnesia
LOG_BASE 日志目录,默认为/var/log/rabbitmq/
# 指定rabbitmq.conf 配置文件位置
RABBITMQ_CONFIG_FILE
# 指定advanced.config 配置文件位置
RABBITMQ_ADVANCED_CONFIG_FILE
# 指定日志文件位置
RABBITMQ_LOGS
5.3 服务日志
服务日志的默认名称为 rabbit@hostname.log
,存放在 /var/log/rabbitmq/
目录下。
日志文件里会有非常详细的日志内容,包括启动日志
部分启动日志内容如下:
2020-08-31 15:20:19.689 [info] <0.336.0>
Starting RabbitMQ 3.8.3 on Erlang 22.2
Copyright (c) 2007-2020 Pivotal Software, Inc.
Licensed under the MPL 1.1. Website: https://rabbitmq.com
2020-08-31 15:20:19.690 [info] <0.336.0>
node : rabbit@iz2zedkxcfuowarkbygobmz
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : oPplMARfpRhgzwDjswcE8g==
log(s) : /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
: /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log
database dir : /var/lib/rabbitmq/mnesia/rabbit@iz2zedkxcfuowarkbygobmz
2020-08-31 15:20:19.710 [info] <0.342.0>
Memory high watermark set to 3128 MiB (3280946790 bytes) of 7822 MiB (8202366976 bytes) total
2020-08-31 15:20:19.714 [info] <0.344.0> Enabling free disk space monitoring
2020-08-31 15:20:19.714 [info] <0.344.0> Disk free limit set to 50MB
2020-08-31 15:20:19.718 [info] <0.347.0> Limiting to approx 32671 file handles (29401 sockets)
2020-08-31 15:20:19.718 [info] <0.348.0> FHC read buffering: OFF
2020-08-31 15:20:19.718 [info] <0.348.0> FHC write buffering: ON
2020-08-31 15:20:19.718 [info] <0.336.0> Running boot step worker_pool defined by app rabbit
2020-08-31 15:20:19.718 [info] <0.337.0> Will use 4 processes for default worker pool
2020-08-31 15:20:19.718 [info] <0.337.0> Starting worker pool 'worker_pool' with 4 processes in it
2020-08-31 15:20:19.791 [info] <0.409.0> Starting message stores for vhost '/'
2020-08-31 15:20:19.801 [info] <0.453.0> started TCP listener on [::]:5672
2020-08-31 15:20:19.835 [info] <0.505.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2020-08-31 15:20:19.835 [info] <0.611.0> Statistics database started.
5.4 rabbitmqctl 工具
rabbitmqctl是用于管理RabbitMQ服务器节点的命令行工具。 它通过连接到专用CLI工具通信端口上的目标RabbitMQ节点并使用共享密钥(称为cookie文件)进行身份验证来执行所有操作。
# 连通Erlang VM 一起关闭
rabbitmqctl stop
# 只关闭 RabbitMQ服务
rabbitmqctl stop_app
# 查看RabbitMQ服务的状态
rabbitmqctl status
rabbitmqctl list_connections
rabbitmqctl list_channels
rabbitmqctl list_exchanges
rabbitmqctl list_queues
rabbitmqctl list_bindings
[root@iz2zedkxcfuowarkbygobmz ~]# rabbitmqctl status
Status of node rabbit@iz2zedkxcfuowarkbygobmz ...
Runtime
OS PID: 25333
OS: Linux
Uptime (seconds): 65725
RabbitMQ version: 3.8.3
Node name: rabbit@iz2zedkxcfuowarkbygobmz
Erlang configuration: Erlang/OTP 22 [erts-10.6] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe]
Erlang processes: 427 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60
Plugins
Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:
* rabbitmq_management
* rabbitmq_web_dispatch
* rabbitmq_management_agent
* amqp_client
* cowboy
* cowlib
Data directory
Node data directory: /var/lib/rabbitmq/mnesia/rabbit@iz2zedkxcfuowarkbygobmz
Config files
* /etc/rabbitmq/rabbitmq.conf
Log file(s)
* /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
* /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log
Alarms
(none)
Memory
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 3.2809 gb
other_proc: 0.0293 gb (32.48 %)
code: 0.026 gb (28.85 %)
other_system: 0.0137 gb (15.2 %)
allocated_unused: 0.0132 gb (14.7 %)
other_ets: 0.0031 gb (3.41 %)
reserved_unallocated: 0.0015 gb (1.69 %)
atom: 0.0015 gb (1.69 %)
plugins: 0.0011 gb (1.18 %)
metrics: 0.0002 gb (0.23 %)
mgmt_db: 0.0002 gb (0.22 %)
binary: 0.0002 gb (0.18 %)
mnesia: 0.0001 gb (0.08 %)
quorum_ets: 0.0 gb (0.05 %)
msg_index: 0.0 gb (0.03 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
File Descriptors
Total: 2, limit: 32671
Sockets: 0, limit: 29401
Free Disk Space
Low free disk space watermark: 0.05 gb
Free disk space: 36.0165 gb
Totals
Connection count: 0
Queue count: 0
Virtual host count: 1
Listeners
Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API
5.5 rabbitmq-plugins 工具
rabbitmq-plugins 是用于管理 RabbitMQ 插件的命令行工具。
# 查看当前插件的使用情况
rabbitmq-plugins list
[E*] rabbitmq_management 3.8.3
[E*] rabbitmq_management_agent 3.8.3
[ ] rabbitmq_mqtt 3.8.3
[ ] rabbitmq_stomp 3.8.3
[ ] rabbitmq_top 3.8.3
[E*] rabbitmq_web_dispatch 3.8.3
注:
标记为 [E*]
为显示启动,标记为 [e*]
为隐式启动,没有标记说明没有启动。
启用和关闭插件:
# 启用插件
rabbitmq-plugins enable rabbitmq_management
# 关闭插件
rabbitmq-plugins disable rabbitmq_management
注:
开启插件后,还需要重启RabbitMQ 服务才能生效。RabbitMQ的插件存储在plugins 文件夹下,.ez 结尾的文件就是插件。
rabbitmq_management 插件
rabbitmq_management 插件提供了Web管理界面。启用插件后,浏览器访问 localhost:15672
,默认用户名和密码:guest;

rabbitmq_management 插件不仅提供了Web管理界面,还提供了HTTP API接口以供调用。比如创建一个队列,通过PUT方法调用 /api/queues/vhost/name 接口来实现。
HTTP API
RabbitMQ提供的HTTP API接口遵循RESTful 规范。使用四种HTTP 方法提供相应的服务。在Web管理界面的左下角点击 HTTP API 或访问 localhost:15672/api
就可以查看所有的API 接口列表了。
5.6 内存和磁盘
通过配置文件的方式设置内存和剩余磁盘的阀值。
内存阀值
当内存使用量超过阀值后,就会产生内存预警并阻塞所有生产者的连接,直到内存使用量低于阀值。
# 设置内存使用的最高阀值 (方式一:相对于RAM设置比值)
vm_memory_high_watermark.relative = 0.4
# 设置内存使用的最高阀值 (方式二:绝对值)
vm_memory_high_watermark.absolute = 2GB
# 开始将内存分页到磁盘的阀值比例
vm_memory_high_watermark_paging_ratio = 0.5
vm_memory_high_watermark
内存上限阀值,默认值:0.4,即OS内存可用量的40%;该值有两种设置方式,方式一是相对于RAM设置比值:vm_memory_high_watermark.relative = 0.6;也可以设置绝对值:vm_memory_high_watermark.absolute = 2GB ;
vm_memory_high_watermark_paging_ratio
开始将内存分页到磁盘的阀值比例,默认值:0.5;
注:设置的内存使用阀值仅针对于生产者而言,这意味着RabbitMQ 实际使用的总内存量可能大于阀值。
剩余磁盘阀值
当剩余磁盘空间低于阀值时,RabbitMQ 同样会阻塞生产者。
# 设置剩余磁盘空间的阀值(方式一:相对于RAM总量设置比值)
disk_free_limit.relative = 2.0
# 设置剩余磁盘空间的阀值(方式二:绝对值)
disk_free_limit.absolute = 2GB;
disk_free_limit
剩余磁盘空间的阀值,默认值:50M;该值有两种设置方式,方式一是相对于RAM总量设置:disk_free_limit.relative = 2.0 也可以设置绝对值:disk_free_limit.absolute = 2GB;
6. 服务高可用性
RabbitMQ 有两种集群方式:
- 普通集群模式
- 镜像队列模式
注:RabbitMQ中的元数据包括交换机和队列的名称属性、绑定信息、vhost等基础信息,不包括队列中的消息数据。
普通集群
工作策略:
在普通集群模式下,集群中各个节点之间只会相互同步元数据,不会被同步消息数据。如果我们连接到 A
节点,但是消息需要存储在 B
节点,那么内部会将其转发到存储队列数据的节点上进行存储。
问题:
虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,假如这节点挂了,还是会出现单点故障,所以这种普通集群模式并没有达到高可用的目的。
镜像队列
镜像队列模式下,节点之间不仅仅会同步元数据,消息内容也会在镜像节点间同步,可用性更高。这种方案提升了可用性的同时,也会带来网络开销从而在一定程度上会影响到性能。
README
作者:银法王
版权声明:本文遵循知识共享许可协议3.0(CC 协议): 署名-非商业性使用-相同方式共享 (by-nc-sa)
参考:
《RabbitMQ 实战指南》
修改记录:
2020-08-18 第一次修订
2020-08-27 完善知识结构
2020-09-26 第一版定稿