消息中间件之RabbitMQ探索指南

前言

本文讲解和学习 RabbitMQ 服务的各种基本概念,使用方法和原理。

1. RabbitMQ

 RabbitMQ 是实现了AMQP(高级消息队列协议) 的消息中间件。RabbitMQ服务最初由 Rabbit 公司开发,使用Erlang 语言编写,2007年发布并开源,2010年被 VMware 收购。

image-20200828100058021

RabbitMQ 版本: 3.8.8 Release (2020-09月)

客户端 Java版本:5.9.0

为什么使用消息中间件

  • 异步

  • 解耦

  • 削峰

1.1 模型架构

RabbitMQ 整体上是一个生产者与消费者模型,主要负责消息的发送、存储和接收。

image-20200822082155105

2. AMQP协议

 Advanced Message Queuing Protocol (AMQP) 高级消息队列协议是应用层协议,工作在TCP协议之上。愿景:成为所有消息中间件的标准协议。

官网:amqp.org

AMQP协议的版本时间线:

版本 时间
AMQP 0.8 版 2006-06
AMQP 0.9.1版 2008-11-13
AMQP v1.0 final 版 2011-10-07

 2012年10月,高级消息队列协议(AMQP) 成为OASIS国际标准。从时间线看,即便是最后一次发布的 v1.0 final 版本距现在也有很多年的时间了,所以AMQP 现在是一个很成熟和稳定的规范协议。RabbitMQ 服务可以看做是AMQP协议的实现者。

2.1 特点

RabbitMQ 的特性有很多,主要有如下:

特性 描述
可靠性 RabbitMQ 使用一些机制来保证可靠性,如持久化、发送确认和接收确认等。
灵活的路由 在消息进入队列之前,通过交换器来路由消息。
多种协议 RabbitMQ 除了原生的支持AMQP协议外,还支持HTTP、STOMP、MQTT等多种协议。
多语言客户端 RabbitMQ 支持几乎所有常用的开发语言,如Java、Python、PHP、C#、JavaScript等。
支持拓展插件 提供很多可插拔的插件安装。
Web管理界面 RabbitMQ 提供了用户Web管理界面,可以监控和管理消息,查看集群节点等。
扩展性 多个RabbitMQ 节点可以组成一个集群。
高可用性 部分节点出现问题的情况下,队列仍然可用。

3. 基本概念

基本概念总览:

概念 描述
生产者(Producer) 消息的生产者,也就是发布消息的一方;
消费者(Consumer) 消息的消费者,也就是接受消息的一方;
消息(Message) 消息包含两部分,消息体和标签;
服务节点(Broker) 一个Broker是一个RabbitMQ服务
连接(Connection) 表示一个TCP连接
通道(Channel) 建立在TCP上的虚拟连接
虚拟主机(vhost) 用于逻辑隔离
交换器(Exchange) 路由消息到队列或交换器
队列(Queue) 存储消息
绑定(Binding) 交换器与队列,交换器与交换器之间需要绑定
路由键(RoutingKey) 消息路由的关键字,字符串类型,单词之间用英文句号隔开
AMQP协议 高级消息队列协议

3.1 连接 Connection

 一个Connection 表示一个TCP连接,操作系统创建和断开TCP连接的开销很大,所以一般情况下都会复用 Connection 。

使用Java客户端创建一个连接:

private static void connect() throws IOException, TimeoutException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("127.0.0.1");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    //connectionFactory.setVirtualHost("/");
    Connection connection = connectionFactory.newConnection();
}

注:

  • guest 用户(密码 guest)是RabbitMQ默认创建的超级管理员,开发中应该创建新的用户来连接MQ服务器。
  • 一般情况下,会有多个客户端同时连接RabbitMQ服务器,应当为每一个客户端建立一个用户,以便区分每个客户端的连接情况。
  • 虚拟主机是建立在连接上的概念,如果不指定,默认是”/“。

3.2 通道 Channel

 一个连接(Connection)可以创建出多个通道实例(Channel),Channel 是客户端中存在的虚拟连接,多个Channel 共用一个连接,节省资源的同时最大化利用资源。

image-20200823094508592

客户端创建一个Channel:

Channel channel = connection.createChannel();

 无论生产者还是消费者,客户端与RabbitMQ服务器的几乎所有交互都是通过 Channel 对象进行操作的。RabbitMQ官方提供的Java客户端的包名为 com.rabbitmq.clientChannel 接口定义了客户端的所有基本操作,所以想要全面了解Channel 的功能,Channel接口应该重点了解。

 创建完Channel ,就可以声明交换器和队列,进行绑定,发送或接收消息了。

:warning: 特别注意:

 通道 Channel 并不是并发安全的实例,因此不能多线程同时操作。应该每一个线程使用一个Channel,而不是多线程之间共用一个Channel。

3.3 交换器(Exchange)

交换器、队列和绑定是RabbitMQ的三大组件。

交换器用来路由消息,RabbitMQ中将消息路由到队列的组件。

创建交换器

创建交换器:

//创建一个非持久化、非自动删除的 direct 类型的交换器
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT);
//创建一个持久化、自动删除、公开的 topic 类型的交换器
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,true,false,null);

exchangeDeclare 方法有多个重载,它们最终都会调用参数最全的一个:

public interface Channel extends AutoCloseable {
    
  Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable,
            boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;  
}

参数如下:

属性 描述
exchange 交换器名称
type 类型
durable 是否持久化
autoDelete 是否自动删除
internal 是否内部交换器
arguments 参数

详解:

 type:交换器共有四种类型:direct、topic 、fanout 、headers ;

 durable:是否持久化,持久化可以将交换器存储到磁盘,如果交换器没有被删除的情况下,服务器重启后不会丢失相关信息。

 autoDelete:设置为 true 表示自动删除,如果该交换器至少被一个队列或交换器绑定过,并且之后所有绑定的队列或交换器都与之解绑,那么该交换器将自动删除。这个地方容易错误理解,如果该交换器从没有被绑定过,那么不会自动删除。

 internal :设置是否为内置交换器,如果是内置交换器,则生产者不能直接将消息发送到内置交换器,只能通过交换器路由到交换器这一种方式。

 arguments:key-value 形式的参数,如 alternate-exchange等。

四种交换器类型

交换器类型有四种:

  • direct 直接路由
  • topic 主题路由
  • fanout 广播路由
  • headers 头部信息路由

direct 直接路由

路由规则:

 根据Routing Key = Bounding Key 的规则路由消息。

使用场景:

 区分发往同一个交换器的消息。

image-20200819153123309

topic 主题路由

路由规则:

 根据通配符匹配路由消息。

  • 星号 * 代办零个或一个单词。例如:com
  • 井号 # 代办零个、一个或多个单词,单词之间通过英文句号 . 隔开。例如:amqp.org.doc

使用场景:

 通过主题对消息进行区分。

image-20200819153814381

使用场景:

fanout 广播路由

路由规则:

 fanout Exchange 无视Bonding Key 的匹配规则,将消息发送到与该交换器绑定的所有队列中。

使用场景:

 广播消息。

image-20200819154925203

headers 头部信息路由

 headers 类型的交换器无视路由键的规则,在发送消息时,BasicProperties 有一个 headers 属性,用于指定key-value 形式的头部信息。交换器和队里进行绑定时,也会有一个key-value 形式的绑定参数,如果交换器是 headers 类型,那么这两组键值对完全匹配时才能路由成功。

//发送消息的头部参数
HashMap<String, Object> args = new HashMap<>();
args.put("userid",10);      
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.headers(args);
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"com.rabbitmq.",true,basicProperties,message.getBytes());
//绑定参数
	Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

注:

 headers 类型的交换器效率低,而且不实用,很少会使用它。

删除交换器

//删除交换器
channel.exchangeDelete(exchangeName);

3.4 队列(Queue)

RabbitMQ中存储消息的缓存区,先进先出FIFO的数据结构。

RabbitMQ中的消息只能存入队列中,队列必须绑定交换器。

多个消费者可以连接同一个队列,队列中消息被多个消费者轮询消费,但队列中的消息只会被消费一次。

队列只能和交换器进行绑定,而交换器既可以和队列绑定,也可以和交换器绑定。内部交换器只能与交换器进行绑定。

声明队列

Channel 声明队列的方式:

public interface Channel extends AutoCloseable {
    
    //声明队列:MQ服务器自动命名,自动删除,非持久化的队列
    Queue.DeclareOk queueDeclare() throws IOException;
    
    //声明队列:指定队列名称,是否持久化,是否排他,是否自动删除,参数
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
    //异步声明队列:不保证队列声明成功;
    void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;
    //查询队列是否存在,如果不存在则抛出 IOException 异常,并关闭 channel
    Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
}

参数含义:

参数 描述
queue 队列名称
durable 是否持久化,存入磁盘,服务重启数据不会丢失。
exclusive 是否排他,如果设置为排他队列,那么该队列仅对首次声明它的连接可见,并且当连接断开时,队列自动删除。排他队列适用于一个客户端同时发送和读取消息的场景。
autoDelete 是否自动删除,设置为true表示自动删除,自动删除的前提是之前至少有一个消费者连接到这个队列,并且之后所有的连接都断开时,队列自动删除。如果从没有消费者连接过这个队列,队列是不会自动删除的。
arguments 设置队列的其他参数(键值对形式)。

常用参数

# 设置队列中消息最大条数
x-max-length
# 设置队列最大容量(单位:字节)
x-max-length-bytes
# 设置队列中消息的过期时间(单位:毫秒)
x-message-ttl
# 设置队列的过期时间(单位:毫秒)
x-expires
# 设置队列的最大优先级
x-max-priority
# 设置队列的死信交换器
x-dead-letter-exchange
# 设置死信交换器的路由键
x-dead-letter-routing-key

每个参数都对应着相应的功能,下面会分别详细讲解。

清空队列

使用 channel.queuePurge 清空队列中的所有消息,队列本身不删除。

channel.queuePurge(queueName);

删除队列

//删除队列
channel.queueDelete(queueName);

3.5 绑定(Binding)

交换器与队列的关联规则,通过 Binding Key 将交换器与队列交换器与交换器进行绑定。两个组件之间的绑定关系可以有多个。

API:

交换器和队列的绑定和解绑:

//绑定交换器和队列
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
//解绑队列和交换器
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

交换器和交换器的绑定和解绑:

Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;

发送消息的路由键、交换器类型、绑定键共同决定了消息被路由到哪个队列。

3.6 发送消息

通过 channel.basicPublish 将消息发送到交换器:

API:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;

   void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.deliveryMode(2).//持久化消息
    priority(6);		//设置消息优先级
AMQP.BasicProperties basicProperties = builder.build();
//发送消息
channel.basicPublish(orderExchange,routingKey,true,basicProperties,message);

mandatory 参数:

 默认情况下,在发送消息时如果交换器根据自身的类型和路由键无法匹配到队列,消息将丢弃。设置 mandatory 参数为true,RabbitMQ将会通过 Basic.Return 命令把消息返还给生产者。

immediate 参数:

 如果immediate 参数设置为true,消息被路由到队列后,发现队列上并不存在任何消费者,那么这条消息不会存入当前队列;如果路由到的所有队列都没有消费者时,MQ服务器会通过Basic.Return 命令把消息返还给生产者。

channel.basicPublish(orderExchange,routing,true,basicProperties,message);
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 			AMQP.BasicProperties properties, byte[] body) throws IOException {
    	String message = new String(body);
        System.out.println("消息投递失败,消息返回: "+message);
    }
});

 mandatory 和 immediate 参数有一个设置为true时,就可以增加返回监听了。如果都设置为true,那么返回的消息有可能是交换器匹配不到队列,又或者是所有匹配的队列上没有消费者产生的。如果两个参数都不设置或都为false,则没有必要增加返回监听。

注:

 immediate 参数不建议去设置使用,首先会降低效率,其次会造成多种结果,如果消息被路由到两个队列,一个队列上有消费者,一个没有消费者,那么该消息不会存入无消费者的队列,而且消息不会返还给生产者。如果想实现immediate 参数相关的功能,可以使用设置过期时间来代替。

3.7 接收消息

消费者有两种方式来接收消息,自动推送和手动获取,也叫推模式和拉模式。

  • 推模式
  • 拉模式

推模式即RabbitMQ服务器源源不断的自动推送消息给消费者客户端,而拉模式需要消费者手动的一个个拉取消息。

推模式

消费者使用 channel.basicConsume 方法实现消息的自动推送。

API:

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

queue:

 队列名称

autoAck:

 消息是否自动确认,默认是true,即自动确认。设置为false改为手动确认,可以使系统更加健壮。

callback:

 Consumer 接口类型的回调,消费者订阅队列后,通过该接口接收通知消息。应该避免回调的接口方法执行时间过长,因为这将延迟将消息分发到同一个频道上的其它使用者。

 DefaultConsumer 是该接口的默认空实现,消费者可以通过将DefaultConsumer 子类化实现自己的逻辑。

使用:

boolean autoACK = false;
channel.basicQos(100);
channel.basicConsume(queueName,autoACK,new MyConsumer(channel));

	class MyConsumer extends DefaultConsumer{

        public MyConsumer(Channel channel) {
            super(channel);
        }

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, 
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            //接收消息
            String message = new String(body);
            //处理消息
            handleMessage(message);
            //确认消息
            getChannel().basicAck(envelope.getDeliveryTag(),false);
        }
    }

Qos 参数

 采用推模式获取消息,消息会自动推送到消费者客户端,推模式受 Qos 参数的影响,即未确认的消息上限。当未确认的消息数量达到设置的上限后,服务器将不再推送消息给客户端,直到未确认的消息数量低于上限。未确认消息的阀值范围是0到 65535,设置 Qos 选项非常有用,它可以达到很好的限流作用,正常情况下,作为消费者的业务服务器接收MQ中推送的消息,然后处理并确认,如果消息产生的速度大于消费的速度,那么消费者未处理的消息和任务会越来越多,有可能导致消费者出现异常。

 设置Qos 阀值后,消费者未确认的消息达到阀值,说明消费者没有来的及处理这么多消息,消息暂且存在MQ中,起到了很好的限流和削峰作用。

拉模式

消费者使用 channel.basicGet 方法手动拉取消息。

API:

GetResponse basicGet(String queue, boolean autoAck) throws IOException;

使用:

//拉取消息
GetResponse mqResponse = channel.basicGet(queueName, false);
//获取消息体
byte[] body = mqResponse.getBody();

注:

 拉模式不受 Qos 参数的影响,只要队列中有未确认的消息,就能拉取成功。

确认消息

 消费者接收到消息,默认是自动确认,即接收到消息立即自动确认,如果消息没有被消费者正常处理,那么该消息也算丢失了。在获取消息时可以将自动确认关闭,处理完消息后再手动确认消息接受成功。

API:

//消息确认
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag :

 deliveryTag 是消息标识,代表一条消息。

multiple:

 false 表示只确认deliveryTag 这一条消息,true 表示确认 deliveryTag 编号及之前的所有未确认的消息。

拒绝消息

消费者接收到消息,也有可能进行拒绝,使用 Basic.Reject 拒绝单条消息 或 Basic.Nack 拒绝多条消息。

API:

//单条拒绝
void basicReject(long deliveryTag, boolean requeue) throws IOException;
//批量拒绝
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

deliveryTag :

 deliveryTag 是消息标识,代表一条消息,类型是64位长整型,从1开始递增。

requeue :

 拒绝后是否重新入队,true表示重新存入队列,false表示从队列移除该消息。

multiple:

 用于拒绝多个消息时使用,true表示拒绝 deliveryTag 编号及之前的所有未被确认的消息,false 则仅仅拒绝deliveryTag 这一条消息,和 basicReject 方法相同。

使用:

//单条拒绝
channel.basicReject(deliveryTag,false);
//批量拒绝
channel.basicNack(deliveryTag,true,false);

注:

 消息被拒绝后,如果不重新入队,不额外处理,消息将会丢弃。可以通过设置死信交换器/队列来收集消费者拒绝的消息。

3.8 虚拟主机 vhost

 AMQ中的虚拟主机用于逻辑隔离。每一个vhost 相当于一个独立的小型RabbitMQ服务器,拥有自己独立的交换器、队列以及绑定关系,甚至是通道和连接,vhost 之间是绝对隔离的,不同 vhost 之间的交换器和队列是无法进行绑定的。

 RabbitMQ默认创建的vhost 为“/”,如果业务上不需要逻辑隔离,那么使用默认的vhost 也是合理的。但当业务达到一定规模的时候,建议按业务场景归类区分,进行隔离,这样不仅可以保证安全,还便于迁移。

4. 进阶开发

4.1 事务机制

 事务就是一组任务要么全部成功,要么全部失败,不允许存在中间状态。RabbitMQ中的事务与数据库中的事务不一样,RabbitMQ中的事务非常简单,只提供一种消息传输的可靠性。

RabbitMQ与事务相关的方法有3个:

channel.txSelect();		//通道开启事务模式
channel.txCommit();		//提交事务
channel.txRollback();	//回滚事务

用法:

try {
   channel.txSelect();
   channel.basicPublish("exchangeName","routingKey",true,basicProperties,message);
   channel.txCommit();
}catch (Exception e){
   e.printStackTrace();
   channel.txRollback();
}

 如果是循环发送消息,只需要将 channel.basicPublish 和 channel.txCommit、txRollback方法包裹在循环体内,

 事务机制在发送一条消息之后会使生产者阻塞,直到Broker给出回应,才能继续进行下一步操作。使用事务发送消息的效率还是比较低的,降低Broker的消息吞吐量。

 由此RabbitMQ提供了另一种消息传输保障-发送确认机制(publisher confirm)。

4.2 发送确认机制

 消息首先会被发送到服务器的交换器,如果在这一环节发生异常,如交换器不存在,RabbitMQ提供了发送确认机制(publisher confirm)保障数据传输的可靠性。

同步确认

使用 channel.waitForConfirms() 同步等待MQ服务器的确认应答:

//通道开启publisher confirm模式
channel.confirmSelect();
channel.basicPublish("exchangeName","routingKey",true,basicProperties,message.getBytes());
if (!channel.waitForConfirms()){
   //消息发送失败后的处理...
}

waitForConfirms() 方法是阻塞方法,

boolean waitForConfirms()
boolean waitForConfirms(long timeout)

注:

 通道只有开启 confirm 模式,才能调用waitForConfirms() 方法,否则会报错。

异步确认

使用 waitForConfirms() 同步确认方式和使用事务的方式在效率方面区别不是很大,消息吞吐量低,效率都一般。

//通道开启publisher confirm模式
channel.confirmSelect();
channel.basicPublish("exchangeName","routingKey",true,basicProperties,message.getBytes()); 
//服务器的确认应答监听
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
     	//Broker回传ACK命令,表示消息已经到达交换器
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //Broker回传Nack命令
                
    }
});

总结:

 事务机制和 publisher confirm 机制两者是互斥的,如果试图在已开启事务模式的信道上再设置 publisher confirm 模式,则会报错。反过来说,如果试图在已开启 publisher confirm 模式的信道上再开启事务模式,同样会报错。

 事务机制和publisher confirm 机制只是保证消息可以发送到服务器的交换器,如果交换器没有绑定的队列或没有匹配的队列,那么默认情况下这个消息会被丢失,所以此时还需要另一种机制来进一步提供保障,那就是设置 mandatory 参数或备份交换器。

4.3 mandatory 参数

 前面提到,事务机制和发送确认机制,只是保证消息被传送到目标交换器,而然当交换器没有绑定的队列或根据类型和路由键匹配不到队列时,消息也会丢失。

 客户端在发送消息时可以通过设置mandatory 参数为true,让RabbitMQ服务器返回给客户端应答来保证消息成功被路由:

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;

用法:

//设置 mandatory 参数为true
channel.basicPublish(orderExchange,routing,true,basicProperties,message.getBytes());
//增加消息返回监听
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 
                             AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("消息投递失败,消息返回: "+message);
    }
});

4.4 备份交换器

 如果生产者发送消息时没有加入mandatory 参数,那么消息在未被路由的情况下会丢失。设置 mandatory 参数为true,同时客户端也要增加返回监听才有意义。除了设置 mandatory 参数这个方法外,RabbitMQ还提供了另一种方式保障未路由的消息不丢失,那就是设置备份交换器。

 在创建交换器时,通过设置 alternate-exchange 参数指定一个备份交换器。

//声明备份交换器        
channel.exchangeDeclare("order-exchange-backup", BuiltinExchangeType.FANOUT,true,false,null);
//声明备份交换器的关联队列
channel.queueDeclare("order-queue-backup", true, false, false, null);
//绑定
channel.queueBind("order-queue-backup","order-exchange-backup","");

//声明业务交换器,通过 alternate-exchange 参数关联备份交换器
HashMap<String, Object> arg = new HashMap<>();
arg.put("alternate-exchange","order-exchange-backup");
channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,arg);
//声明队列
channel.queueDeclare(orderQueue, true, false, false, null);
//绑定
channel.queueBind(orderQueue,orderExchange,"normalKey");

 如果不做额外处理,交换器不能通过消息的routingKey 匹配到目标队列,则消息丢失。如果设置备份交换器和队列,则消息存储在备份交换器绑定的队列中。

 特别注意,到达备份交换器的路由键就是原来的路由键。备份交换器与备份队列之间一定要有效绑定,否则未路由的消息还是会丢失。

image-20200825170958392

使用备份交换器也有特殊情况:

  1. 如果备份交换器不存在,客户端和服务器都不会报错,消息丢失。
  2. 如果备份交换器没有绑定任何队列,客户端和服务器都不会报错,消息丢失。
  3. 如果备份交换器没有匹配到任何队列,客户端和服务器都不会报错,消息丢失。
  4. 备份交换器和mandatory 参数一起使用,那么mandatory 参数无效;

小结:

 通过 发送消息加入mandatory 参数和使用备份交换器可以有效提高系统的健壮性。

4.5 持久化机制

持久化即将数据写入磁盘,服务重启后数据不会丢失,系统自动创建持久化的数据。

RabbitMQ内部的持久化分为三部分:

  • 交换器持久化
  • 队列持久化
  • 消息持久化

交换器持久化

交换器的持久化在声明创建时即可指定,第三个 boolean 类型的参数 durable 设置为true 表示持久化该交换器。

Exchange.DeclareOk exchangeDeclare(String exchange,
    BuiltinExchangeType type,
    boolean durable,
    boolean autoDelete,
    boolean internal,
    Map<String, Object> arguments) throws IOException;

声明一个持久化的交换器:

channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,false,null);

队列持久化

队列的持久化在声明创建时即可指定,第二个 boolean 类型的参数 durable 设置为true 表示持久化该队列。

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

声明一个持久化的队列:

channel.queueDeclare(orderQueue, true, false, false, null);

消息持久化

消息的持久化在发布消息时指定,通过 BasicProperties 设置持久化属性。

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.deliveryMode(2); //设置持久化消息
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"routingKey",true,basicProperties,message);

特别注意:

 要想保证持久化的消息不丢失,则消息相关联的队列与交换器的持久化都要打开,否则消息还是不会持久化。消息存储在队列中,队列如果不持久化,只持久化消息也是没意义的。

4.6 接收确认机制

 无论是推送消息还是拉取消息,默认情况下是自动确认的,即消息到达消费者手中后立即确认收到消息,当消费者还没来得处理就重启或宕机的这种情况下,这条消息虽然自动确认了,但是也相当于丢失了。

 面对这种情况,RabbitMQ 提供给消费者手动确认机制(Channel 接口提供)。

//手动确认消息
void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag :

 deliveryTag 是消息标识,代表一条消息。

multiple:

 false 表示只确认deliveryTag 这一条消息,true 表示确认 deliveryTag 编号及之前的所有未确认的消息。

4.7 如何保障消息可靠性

业务系统接入消息中间件首先要考虑消息传输的可靠性,消息传输无非分三种情况:

  • 最少一次:消息不会丢失,但可能会重复传输。
  • 最多一次:消息可能会丢失,但不会重复传输。
  • 恰好一次:每条消息只被传输一次。

总结下有四点:

  1. 事务机制或发送确认机制;
  2. 设置 mandatory 参数或备份交换器;
  3. 交换器、队列、消息可以设置为持久化;
  4. 消费者手动确认消息被正常消费;

4.8 设置过期时间 TTL

RabbitMQ支持给消息和队列设置过期时间。

设置消息的TTL

有两种方式可以设置消息的过期时间,第一种是在声明队列的参数(x-message-ttl)中设置,第二种是在发送消息时单独设置。

//方式一: 声明队列时设置过期时间
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-message-ttl",10000);
channel.queueDeclare(orderQueue, true, false, false, arg);
//方式二: 发送消息时单独设置过期时间
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties();
basicProperties.builder()
    .deliveryMode(2)    //持久化消息
    .expiration("60000");   //过期时间为60秒
channel.basicPublish(orderExchange,routing,true,basicProperties,message.getBytes());

设置队列的TTL

在声明队列时通过 x-expires 参数设置队列的TTL。

//设置队列的过期时间
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-expires",60000);
channel.queueDeclare(orderQueue, true, false, false, arg);

4.9 死信交换器/队列

 当消息在一个队列中变为死信(dead message)时,它就被重新发送到另一个交换器–死信交换器DLX(dead letter exchange),绑定死信交换器的队列称之为死信队列。

消息变为死信有如下几种情况:

  • 消息被消费者拒绝
  • 消息过期
  • 队列达到最大长度/最大容量

死信交换器和其他交换器没有区别,都能够和任意交换器或队列进行绑定。

使用

死信交换器的使用方式和备份交换器差不多:

//声明死信交换器
channel.exchangeDeclare("x.dlx", BuiltinExchangeType.DIRECT,true,false,null);
//声明死信交换器的关联队列
channel.queueDeclare("q.dlx", true, false, false, null);
//绑定
channel.queueBind("x.dlx","q.dlx","dlx.order");

//声明业务交换器,通过参数设置死信交换器
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-dead-letter-exchange","x.dlx");
arg.put("x-dead-letter-routing-key","dlx.order");
arg.put("x-message-ttl",60000);
channel.exchangeDeclare(orderExchange, BuiltinExchangeType.TOPIC,true,false,arg);
//声明队列
channel.queueDeclare(orderQueue, true, false, false, null);
//绑定
channel.queueBind(orderQueue,orderExchange,"normalKey");

死信交换器/队列的工作示意图:

image-20200828092528759

死信交换器和备份交换器

 死信交换器和备份交换器的工作机制差不多,都属于其他交换器或队列的“备胎”,除此之外,与其他交换器并没有什么不同。死信交换器和备份交换器主要有两点不同:

  • 作用场景不一样。备份交换器在原交换器匹配不到队列时发挥作用,死信交换器在队列中的消息被丢弃时发挥作用;
  • 死信交换器可以指定路由键,备份交换器的路由键不能指定,默认只能是原交换器的路由键;

4.10 延迟队列

 延迟队列存储的是延迟消息,所谓延迟消息,是指生产者发送消息后,并不想让消费者立刻拿到消息,而是等待特定的时间后,消费者才能拿到这个消息进行消费。

 RabbitMQ 并没有直接提供延迟队列功能,通过死信交换器 DLX + 设置队列过期时间 TTL 可以实现延迟队列,上面的例子就是延迟队列的实现。

4.11 优先级队列

在优先级队列中,优先级高的消息最先被消费。(前提:队列中积累有一定量的消息。)

通过设置队列的 x-max-priority 参数来指定队列的最大优先级:

HashMap<String, Object> args = new HashMap<>();
args.put("x-max-priority",10);
channel.queueDeclare(orderQueue, true, false, false, args);

然后在发送消息时,指定消息的优先级:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties().builder();
builder.priority(6);
AMQP.BasicProperties basicProperties = builder.build();
channel.basicPublish(orderExchange,"com.rabbitmq",true,basicProperties,message);

注:

 只有当生产消息的速度大于消费消息的速度,队列中积累有一定量的消息时,使用优先级队列才有意义,否则队列中只有一个或少量的消息,设置优先级也就失去了意义。

4.12 惰性队列

 RabbitMQ 3.6.0 开始引入惰性队列,惰性队列会尽可能的将消息存入磁盘,而在消费者消费到相应的信息时才会被加载到内存中。它的一个重要的设计目标就是为了支持更长的队列,即存储更多的消息。

 如果消息的消费能力大于生产能力,那么队列中不会积压太多消息,惰性队列也就没什么用场。惰性队列适用于消费速度小于生产速度,队列中可能会堆积大量消息的业务场景。

 RabbitMQ的队列具有两种模式:default 和 lazy,默认为 default 模式,lazy 即为惰性队列模式。

//声明队列时设置为惰性队列
HashMap<String, Object> arg = new HashMap<>();
arg.put("x-queue-mode","lazy");
channel.queueDeclare(orderQueue, true, false, false, arg);

 惰性队列和普通队列相比,存储同样数量和大小的消息,惰性队列只有很小的内存开销,无论队列和消息是否设置为持久化,惰性队列首先会将消息写入磁盘,这会增加I/O开销,对于持久化的消息来说, I/O开销这是不可避免的,所以消息持久化和惰性队列是一组好拍档;而对于非持久化的消息来说,惰性队列牺牲了I/O操作的开销,节省了内存占用的开销,重启后消息一样会丢失。

5. RabbitMQ 服务器运维

5.1 安装启动

 安装RabbitMQ之前要事先安装 Erlang 环境, 不同的RabbitMQ 服务版本对应的Erlang 环境版本也不一样,安装前在官网确定对应的版本, RabbitMQ 3.8.7 Release 版本需要的 Erlang/OTP 版本为 21.3 ~ 23.x 。

yum install erlang

 安装完 Erlang 环境和 RabbitMQ 服务后,系统会自动注册几个关于 RabbitMQ的服务,如 rabbitmqctl、rabbitmq-server、rabbitmq-plugins 等,直接运行 rabbitmq-server 就可以启动 RabbitMQ 服务。

# 前台方式启动RabbitMQ 服务
rabbitmq-server

  ##  ##      RabbitMQ 3.8.3
  ##  ##
  ##########  Copyright (c) 2007-2020 Pivotal Software, Inc.
  ######  ##
  ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com

  Doc guides: https://rabbitmq.com/documentation.html
  Support:    https://rabbitmq.com/contact.html
  Tutorials:  https://rabbitmq.com/getstarted.html
  Monitoring: https://rabbitmq.com/monitoring.html

  Logs: /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
        /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log

  Config file(s): /etc/rabbitmq/rabbitmq.conf

  Starting broker... completed with 3 plugins.

 前台方式启动RabbitMQ 会打印 服务版本信息,日志文件、配置文件位置等信息。前台方式启动的服务当前窗口不能关闭,否则服务停止。后台方式启动可以使用 systemctl :

# 启动服务
systemctl start rabbitmq-server
# 查看服务状态
systemctl status rabbitmq-server
# 停止服务
systemctl stop rabbitmq-server
# 重启服务
systemctl restart rabbitmq-server

我们还可以使用 rabbitmqctl 工具启动和停止RabbitMQ服务:

# 启动服务
rabbitmqctl start_app
# 查看服务状态
rabbitmqctl status
# 停止服务
rabbitmqctl stop_app

5.2 配置文件

 RabbitMQ 的配置文件位于 /etc/rabbitmq/ 目录下,可以通过 rabbitmqctl status 查看配置文件的所在位置,默认是 /etc/rabbitmq/rabbitmq.conf ,如果 /etc/rabbitmq/ 目录下面没有该配置文件,手动新建一个 rabbitmq.conf 即可,RabbitMQ 启动时会自动加载该配置文件。

 RabbitMQ 3.7 版本之前的配置文件的语法继承于erlang语言,有点类似json

[
%% this is rabbitmq config file
{rabbit, [
        {vm_memory_high_watermark_paging_ratio, 0.4},
        {vm_memory_high_watermark, 0.4}
        ]
}
].

语法:

  • 注释使用%%
  • 最外层一定要使用 [ ]. 注意不要省略英文句号,否则启动报错。

RabbitMQ 3.7 版本及之后,配置文件的语法发生变化,语法更加简单和清爽,符合主流配置文件格式:

# this is rabbitmq config file
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.5

官方在 GitHub上给出了一份配置文件的demo,可以参考使用。

部分内容如下:

# 配置监听端口
listeners.tcp.default = 5672

listeners.ssl.default = 5671
# 接受TCP连接的Erlang进程数
num_acceptors.tcp = 10
num_acceptors.ssl = 10

# 设置每个连接允许的最大通道数, 0表示不限制
channel_max = 128

# 内存使用限制
# 配置内存阀值:(相对方式) 40%
vm_memory_high_watermark.relative = 0.4
# 配置内存阀值:(绝对方式) 2GB (1k = 1024 bytes, 1kB = 1000 bytes)
vm_memory_high_watermark.absolute = 2GB
# 内存换页的阀值
vm_memory_high_watermark_paging_ratio = 0.5

# 磁盘使用限制:相对于RAM总量设置
disk_free_limit.relative = 3.0 
# 磁盘使用限制:绝对值
disk_free_limit.absolute = 2GB

# HTTP 监听设置
management.tcp.port = 15672
management.tcp.ip   = 0.0.0.0
management.http_log_dir = /path/to/access.log

# HTTPS 监听设置
management.ssl.port       = 15671
management.ssl.cacertfile = /path/to/ca_certificate.pem
management.ssl.certfile   = /path/to/server_certificate.pem
management.ssl.keyfile    = /path/to/server_key.pem

enabled_plugins

 设置允许的插件列表,配置文件的格式为 [rabbitmq_management,rabbitmq_management_agent,rabbitmq_web_dispatch].

rabbitmq.conf

 设置RabbitMQ的运行时参数。rabbitmq 的配置参数大多都有默认值,很多默认值不需要用户修改就可以很好的工作。rabbitmq.conf 配置文件设置的都是 key-value 形式的参数,这些设置会合并到默认值中。

变量 描述
listeners 监听AMQP协议的TCP普通端口号,默认值:5672
num_acceptors.tcp 接收TCP监听的Erlang 进程数,默认值:10
handshake_timeout TCP握手的最长时间,默认值:10000,即10秒
listeners.ssl ssl端口号,默认不设置。如有需要可以设置为 5671
num_acceptors.ssl 接收TCP(SSL)监听的Erlang 进程数,默认值:10
ssl_handshake_timeout TCP(SSL)握手的最长时间,默认值:5000,即5秒
vm_memory_high_watermark 内存上限阀值,默认值:0.4,即OS内存可用量的40%;
该值有两种设置方式,方式一是相对于RAM设置比值:vm_memory_high_watermark.relative = 0.6
也可以设置绝对值:vm_memory_high_watermark.absolute = 2GB
vm_memory_high_watermark_paging_ratio 开始将内存分页到磁盘的阀值比例,默认值:0.5;
disk_free_limit 剩余磁盘空间的阀值,默认值:50M;
该值有两种设置方式,方式一是相对于RAM总量设置:disk_free_limit.relative = 2.0
也可以设置绝对值:disk_free_limit.absolute = 2GB
log.file.level 日志打印级别,默认值:info;级别可以是 error、warning、info,debug。
error 只打印错误信息,warning 打印错误和警告信息,info 打印错误、警告和参考信息,
debug打印 打印错误、警告、参考信息、调试信息。
channel_max 与客户端协商的使用通道的最大数量,默认值: 2047;设置为0表示不限制,
一般不要这么做,通道越多,占用Broker的内存越多。
channel_operation_timeout 通道操作超时时间(单位:毫秒),默认值:15000,即15秒。
max_message_size 允许发送消息的最大大小(单位:字节)。
默认值:134217728,即128M;最大值为 536870912,即512M;
heartbeat 心跳,默认值:60;
default_vhost 默认的虚拟主机,默认值:/

注:可以参考官方给出的配置文件demo: rabbitmq.conf demo

advanced.config

 如果存在 advanced.config 则会自动加载此配置文件,advanced.config 中的内容使用原来的 Erlang 风格书写,配置的都是比key-value 形式稍微复杂的数据格式,如数组,对象等。

 advanced.config 中的内容同样会合并到默认值中。advanced.config 和 rabbitmq.conf 都是配置文件,两者以不同的语法各自配置不同的参数,没有主次之分,也不应该发生冲突。

注:可以参考官方给出的配置文件demo: advanced.config demo

rabbitmq-env.conf

设置RabbitMQ的环境变量参数:

 RABBITMQ_NODE_PORT 端口设置,默认为5672

 RABBITMQ_NODENAME 节点名称,默认为rabbit

 MNESIA_BASE 后端存储目录,默认为/var/lib/rabbitmq/mnesia

 LOG_BASE 日志目录,默认为/var/log/rabbitmq/

# 指定rabbitmq.conf 配置文件位置
RABBITMQ_CONFIG_FILE
# 指定advanced.config 配置文件位置
RABBITMQ_ADVANCED_CONFIG_FILE
# 指定日志文件位置
RABBITMQ_LOGS

5.3 服务日志

服务日志的默认名称为 rabbit@hostname.log,存放在 /var/log/rabbitmq/ 目录下。

日志文件里会有非常详细的日志内容,包括启动日志

部分启动日志内容如下:

2020-08-31 15:20:19.689 [info] <0.336.0> 
 Starting RabbitMQ 3.8.3 on Erlang 22.2
 Copyright (c) 2007-2020 Pivotal Software, Inc.
 Licensed under the MPL 1.1. Website: https://rabbitmq.com
2020-08-31 15:20:19.690 [info] <0.336.0> 
 node           : rabbit@iz2zedkxcfuowarkbygobmz
 home dir       : /var/lib/rabbitmq
 config file(s) : /etc/rabbitmq/rabbitmq.conf
 cookie hash    : oPplMARfpRhgzwDjswcE8g==
 log(s)         : /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
                : /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log
 database dir   : /var/lib/rabbitmq/mnesia/rabbit@iz2zedkxcfuowarkbygobmz
 
2020-08-31 15:20:19.710 [info] <0.342.0> 
Memory high watermark set to 3128 MiB (3280946790 bytes) of 7822 MiB (8202366976 bytes) total
2020-08-31 15:20:19.714 [info] <0.344.0> Enabling free disk space monitoring
2020-08-31 15:20:19.714 [info] <0.344.0> Disk free limit set to 50MB
2020-08-31 15:20:19.718 [info] <0.347.0> Limiting to approx 32671 file handles (29401 sockets)
2020-08-31 15:20:19.718 [info] <0.348.0> FHC read buffering:  OFF
2020-08-31 15:20:19.718 [info] <0.348.0> FHC write buffering: ON
2020-08-31 15:20:19.718 [info] <0.336.0> Running boot step worker_pool defined by app rabbit
2020-08-31 15:20:19.718 [info] <0.337.0> Will use 4 processes for default worker pool
2020-08-31 15:20:19.718 [info] <0.337.0> Starting worker pool 'worker_pool' with 4 processes in it
2020-08-31 15:20:19.791 [info] <0.409.0> Starting message stores for vhost '/' 
2020-08-31 15:20:19.801 [info] <0.453.0> started TCP listener on [::]:5672
2020-08-31 15:20:19.835 [info] <0.505.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2020-08-31 15:20:19.835 [info] <0.611.0> Statistics database started.

5.4 rabbitmqctl 工具

 rabbitmqctl是用于管理RabbitMQ服务器节点的命令行工具。 它通过连接到专用CLI工具通信端口上的目标RabbitMQ节点并使用共享密钥(称为cookie文件)进行身份验证来执行所有操作。

# 连通Erlang VM 一起关闭
rabbitmqctl stop 
# 只关闭 RabbitMQ服务
rabbitmqctl stop_app 
# 查看RabbitMQ服务的状态
rabbitmqctl status

rabbitmqctl list_connections 
rabbitmqctl list_channels
rabbitmqctl list_exchanges
rabbitmqctl list_queues
rabbitmqctl list_bindings
[root@iz2zedkxcfuowarkbygobmz ~]# rabbitmqctl status
Status of node rabbit@iz2zedkxcfuowarkbygobmz ...
Runtime

OS PID: 25333
OS: Linux
Uptime (seconds): 65725
RabbitMQ version: 3.8.3
Node name: rabbit@iz2zedkxcfuowarkbygobmz
Erlang configuration: Erlang/OTP 22 [erts-10.6] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:64] [hipe]
Erlang processes: 427 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /etc/rabbitmq/enabled_plugins
Enabled plugins:

 * rabbitmq_management
 * rabbitmq_web_dispatch
 * rabbitmq_management_agent
 * amqp_client
 * cowboy
 * cowlib

Data directory

Node data directory: /var/lib/rabbitmq/mnesia/rabbit@iz2zedkxcfuowarkbygobmz

Config files

 * /etc/rabbitmq/rabbitmq.conf

Log file(s)

 * /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz.log
 * /var/log/rabbitmq/rabbit@iz2zedkxcfuowarkbygobmz_upgrade.log

Alarms

(none)

Memory

Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 3.2809 gb
other_proc: 0.0293 gb (32.48 %)
code: 0.026 gb (28.85 %)
other_system: 0.0137 gb (15.2 %)
allocated_unused: 0.0132 gb (14.7 %)
other_ets: 0.0031 gb (3.41 %)
reserved_unallocated: 0.0015 gb (1.69 %)
atom: 0.0015 gb (1.69 %)
plugins: 0.0011 gb (1.18 %)
metrics: 0.0002 gb (0.23 %)
mgmt_db: 0.0002 gb (0.22 %)
binary: 0.0002 gb (0.18 %)
mnesia: 0.0001 gb (0.08 %)
quorum_ets: 0.0 gb (0.05 %)
msg_index: 0.0 gb (0.03 %)
connection_other: 0.0 gb (0.0 %)
connection_channels: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 32671
Sockets: 0, limit: 29401

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 36.0165 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Interface: [::], port: 15672, protocol: http, purpose: HTTP API

5.5 rabbitmq-plugins 工具

rabbitmq-plugins 是用于管理 RabbitMQ 插件的命令行工具。

# 查看当前插件的使用情况
rabbitmq-plugins list

[E*] rabbitmq_management               3.8.3
[E*] rabbitmq_management_agent         3.8.3
[  ] rabbitmq_mqtt                     3.8.3
[  ] rabbitmq_stomp                    3.8.3
[  ] rabbitmq_top                      3.8.3
[E*] rabbitmq_web_dispatch             3.8.3

注:

 标记为 [E*] 为显示启动,标记为 [e*] 为隐式启动,没有标记说明没有启动。

启用和关闭插件:

# 启用插件
rabbitmq-plugins enable rabbitmq_management
# 关闭插件
rabbitmq-plugins disable rabbitmq_management

注:

 开启插件后,还需要重启RabbitMQ 服务才能生效。RabbitMQ的插件存储在plugins 文件夹下,.ez 结尾的文件就是插件。

rabbitmq_management 插件

rabbitmq_management 插件提供了Web管理界面。启用插件后,浏览器访问 localhost:15672,默认用户名和密码:guest;

image-20200831211220105

 rabbitmq_management 插件不仅提供了Web管理界面,还提供了HTTP API接口以供调用。比如创建一个队列,通过PUT方法调用 /api/queues/vhost/name 接口来实现。

HTTP API

 RabbitMQ提供的HTTP API接口遵循RESTful 规范。使用四种HTTP 方法提供相应的服务。在Web管理界面的左下角点击 HTTP API 或访问 localhost:15672/api 就可以查看所有的API 接口列表了。

image-20200901093214916

5.6 内存和磁盘

通过配置文件的方式设置内存和剩余磁盘的阀值。

内存阀值

当内存使用量超过阀值后,就会产生内存预警并阻塞所有生产者的连接,直到内存使用量低于阀值。

# 设置内存使用的最高阀值 (方式一:相对于RAM设置比值)
vm_memory_high_watermark.relative = 0.4
# 设置内存使用的最高阀值 (方式二:绝对值)
vm_memory_high_watermark.absolute = 2GB
# 开始将内存分页到磁盘的阀值比例
vm_memory_high_watermark_paging_ratio = 0.5

 vm_memory_high_watermark 内存上限阀值,默认值:0.4,即OS内存可用量的40%;该值有两种设置方式,方式一是相对于RAM设置比值:vm_memory_high_watermark.relative = 0.6;也可以设置绝对值:vm_memory_high_watermark.absolute = 2GB ;

 vm_memory_high_watermark_paging_ratio 开始将内存分页到磁盘的阀值比例,默认值:0.5;

注:设置的内存使用阀值仅针对于生产者而言,这意味着RabbitMQ 实际使用的总内存量可能大于阀值。

剩余磁盘阀值

当剩余磁盘空间低于阀值时,RabbitMQ 同样会阻塞生产者。

# 设置剩余磁盘空间的阀值(方式一:相对于RAM总量设置比值)
disk_free_limit.relative = 2.0 
# 设置剩余磁盘空间的阀值(方式二:绝对值)
disk_free_limit.absolute = 2GB;

 disk_free_limit 剩余磁盘空间的阀值,默认值:50M;该值有两种设置方式,方式一是相对于RAM总量设置:disk_free_limit.relative = 2.0 也可以设置绝对值:disk_free_limit.absolute = 2GB;

6. 服务高可用性

RabbitMQ 有两种集群方式:

  • 普通集群模式
  • 镜像队列模式

注:RabbitMQ中的元数据包括交换机和队列的名称属性、绑定信息、vhost等基础信息,不包括队列中的消息数据。

普通集群

工作策略:

 在普通集群模式下,集群中各个节点之间只会相互同步元数据,不会被同步消息数据。如果我们连接到 A 节点,但是消息需要存储在 B 节点,那么内部会将其转发到存储队列数据的节点上进行存储。

问题:

 虽然说内部可以实现转发,但是因为消息仅仅只是存储在一个节点,假如这节点挂了,还是会出现单点故障,所以这种普通集群模式并没有达到高可用的目的。

镜像队列

 镜像队列模式下,节点之间不仅仅会同步元数据,消息内容也会在镜像节点间同步,可用性更高。这种方案提升了可用性的同时,也会带来网络开销从而在一定程度上会影响到性能。

README

作者:银法王

版权声明:本文遵循知识共享许可协议3.0(CC 协议): 署名-非商业性使用-相同方式共享 (by-nc-sa)

参考:

 RabbitMQ 官方服务器操作手册

 RabbitMQ 服务配置文档

 RabbitMQ 官方客户端操作手册

 阿里云消息队列AMQP手册

 《RabbitMQ 实战指南》

修改记录:

 2020-08-18 第一次修订

 2020-08-27 完善知识结构

 2020-09-26 第一版定稿


消息中间件之RabbitMQ探索指南
http://jackpot-lang.online/2020/08/18/数据库/消息中间件之RabbitMQ探索指南/
作者
Jackpot
发布于
2020年8月18日
许可协议