如何保证消息的可靠性传输


在生产环境中,因为机器以及网络设备的不可靠,保证消息的可靠是待解决的问题。在特定场景下消息可能存在丢失风险

消息发送流程

我们可以将 RabbitMQ 消息处理的过程分为三个步骤:

  • 生产阶段:生产者生产消息并且发送到消息队列;
  • 储存阶段:消息队列存储和处理消息;
  • 消费阶段:消息队列将消息转发到消费者。

上述每个步骤都有可能出现消息丢失的风险;
在这里插入图片描述

生产者生产消息并且发送到消息队列

丢失场景

生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,比如:

  • 网络故障。网络环境的不可靠导致消息发送失败,例如网络丢包、网络故障。
  • 数据在网络中传输会经过诸多网络设备,只要其中一个网络链接在数据抵达前已经流量满载,新到的数据将会阻塞一段时间段。

AMQP事务机制

使用AMQP协议的事务机制。生产者在发出消息之后,消息是否到达RabbitMQ服务器是默认不可知的,在生产者发送消息之前,调用channel.txSelect 语句开启事务

  • 如果消息发送失败,那么调用channel.txRollback回滚事务,尝试重新发送一条消息;
  • 如果消息发送成功,那么调用channel.txCommit提交事务。

采用事务的缺点是增加耗时,会降低RabbitMQ的吞吐性能。

Confirm机制

RabbitMQ有一种性能改进方案,即Confirm机制

  • 生产者调用channel.confirmSelect将通信方式设置为confirm模式;
  • 生产者发送的所有消息都会被分配一个唯一 ID;
  • 当生产者发送的消息成功投递到队列之后,RabbitMQ会发送一个ack给生产者,生产者即得知这条消息已经成功发送。
  • 如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,需要重试

我们也可以结合这个机制在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么就需要重新发送

区别

  • 事务机制是同步的,你提交一个事务之后会阻塞在那里
  • confirm机制是异步的,发送消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。

消息队列存储和处理消息

消息存储在 RabbitMQ 队列中,如果队列没有持久化,如果服务器宕机,RabbitMQ 服务器重启后会导致消息丢失。

解决方案

开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,如果 RabbitMQ 自己宕机了,重启之后会自动读取之前存储的数据;

也有可能RabbitMQ 还没持久化,就宕机了,这种情况发生的概率比较小

持久化也可以跟生产者的confirm机制配合起来,只有消息被持久化到磁盘之后,才给生产者发送ack,所以哪怕是出现上面这种情况,RabbitMQ 宕机了,消息丢了,生产者收不到ack,还是回重发的。

消息队列持久化

  • Exchange 持久化:以 Direct 模式为例,将 durable 参数设置为 true。

  • Queue 持久化:将 durable 参数设置为 true,但是这样只能保证持久化 Queue 的元数据,但是不会持久化 Queue 里存储的消息。

  • 消息持久化:发送消息的时候将deliveryMode设置为2,将消息设置为持久化的

  • SpringBoot中的rabbitTemplate默认设置消息是持久化,不需要手动配置

消息队列将消息转发到消费者

消费者在收到消息之后,还没来得及处理消息的消费逻辑,所在机器就宕机了,导致内存中的消息丢失。

解决方案

RabbitMQ 默认采用自动 ACK 机制,在没有处理业务逻辑之前,消费者就会告知消息队列已经成功收到消息,这种方式并不能解决这种问题

  • 我们可以关闭自动ACK模式,通过一个 调用API接口就行,消费完消息后,再返回ACK。这样的话,如果你还没处理完,就没有ACK
  • 这样 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

Author: stream
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source stream !
  TOC