本文主要讲解 RocketMQ 的消息原理方面相关理论,不涉及消息的底层存储过程

消息在 RocketMQ 中流转的过程与消息可靠性的保证

在研究 RocketMQ 的消息原理之前,我们必须先了解 RocketMQ 的领域模型,以及消息在 RocketMQ 中流转的过程

RocketMQ 领域模型

Apache RocketMQ Main Architecture

如上图所示,RocketMQ 中消息的生命周期主要分为消息生产、消息存储、消息消费三大流程。生产者生产消息并将其发送到 RocketMQ 服务端,消息被存储在服务端的主题中,消费者通过订阅主题来消费消息

消息生产

生产者(Producer):RocketMQ 中用于产生消息的实体,一般继承于业务调用链路的上游。

消息存储

  • 主题(Topic):RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平拓展是由主题内的队列实现的
  • 队列(MessageQueue):RocketMQ 消息传输和存储的实际单元容器。RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征
  • 消息(Message):RocketMQ 的最小传输单元,消息具备不变性,在初始化和完成存储后不可变

消息消费

  • 消费者分组(ConsumerGroup):RocketMQ发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
  • 消费者(Consumer):RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
  • 订阅关系(Subscription):RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

在了解领域模型之后,我们就可以基于这个领域模型,进一步了解消息的流转过程

消息的发送流程

一条消息从生产到被消费,大致会经历以下三个阶段

  • 生产阶段,Producer 新建消息,然后通过网络将消息发送给 MQ Broker服务器
  • 存储阶段,消息存储在 Broker 端的磁盘
  • 消费阶段,Consumer 从 Broker 拉取消息

在这三个阶段,如果发生了网络波动、机器宕机、断电等异常情况,就可能导致消息的丢失。因此,消息的可靠性保证也从这三个阶段展开

消息可靠性的保证

生产阶段

生产者通过网络发送消息给 Broker,当 Broker 收到后,将会返回确认响应消息(ACK)给 Producer。所以只要生产者收到了返回的 ACK,就代表消息在生产过程中没有丢失

发送普通消息的示例代码如下

//普通消息发送。
MessageBuilder messageBuilder = new MessageBuilderImpl();
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}

send 方法是一个同步操作,只要不抛出异常,就说明消息已经发送成功

另外,还有一种异步的 send 方法,但是需要重写一个回调方法,以确保在消息发送成功 or 失败后可以继续后续的业务流程

无论是同步发送还是异步发送,都有可能出现由于网络抖动而导致发送失败的情况。针对这种情况,可以设置合理的重试次数,当发送消息失败时,会自动重试发送

Broker 存储阶段

默认情况下,消息到达 Broker 端后,会先将消息保存在内存中,然后立刻返回 ACK 给 Producer,随后 Broker 会定期地将一组消息异步地写入磁盘,它可以减少磁盘的I/O次数,但是如果机器突然掉电或宕机,内存中的消息没有写入磁盘,就会出现丢失消息的情况

为了确保 Broker 端不丢失消息,Broker 端选择把消息记录到 CommitLog 中,关于记录到 CommitLog,也有两种方式:

  1. 同步刷盘:生产者将消息发送到 Broker 后,只有 Broker 将消息成功记录到 CommitLog 之后,才会返回 ACK。这种可靠性高,但是性能差
  2. 异步刷盘:Broker 将消息写入 CommitLog 采用后台线程异步刷盘的方式,刷盘完成之后回调接口返回发送成功的 ACK。它的性能更好,但是 Broker 宕机时会丢失一部分没有写入到 CommitLog 的消息

消费阶段

Consumer 从 Broker 拉取消息,并执行后续的业务逻辑,一旦执行成功,将会返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态给 Broker

如果 Broker 没有收到ACK,Consumer 下次还会尝试拉取这条消息,进行重试,这样避免了 Consumer 在消费过程中出现了异常,或由于网络抖动而导致消息丢失的情况

消费普通消息的示例代码如下

//消费示例一:使用PushConsumer消费普通消息,只需要在消费监听器中处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费普通消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

消息幂等性的保证

在消息幂等性和消息可靠性的抉择之中,RocketMQ选择了保证消息可靠性,这就意味着在消息中间件层面上消息幂等性得不到保障,消息重复的问题需要开发者在业务层面自行解决

一般情况下,导致消息重复问题的原因可能是网络抖动。例如在消息的消费阶段,由于网络抖动,Consumer 返回给 Broker 的 ACK 丢失了,Broker 自然无法收到来自 Consumer 的 ACK。因此,当 Consumer 再次拉取消息时,就会拉取到已经消费过的消息,这就是消息重复

解决消息重复的方法是通过存储消息的 MessageID,通过在缓存或数据库中查找对应的 MessageID 来判断消息是否已经被消费过。一般可以通过 MySQL 或 Redis 的 Set 来存储消息的MessageID,但要根据具体业务来决定是采用 MySQL 还是采用 Redis

消息堆积问题

消息队列主要的作用是“削峰”,那么消息队列必然需要有一定的消息积压能力来保证后端服务的正常运作。

如果发生了消息积压,意味着 Consumer 的消费速度赶不上 Producer 的生产速度,这时候需要考虑提高 Consumer 的消费能力

  • 如果 Queue 的数量大于 Consumer 的数量,这时候需要做的是 消费者扩容,也就是把 Consumer 的数量增加到和 Queue 的数量一致
  • 如果 Consumer 的数量大于等于 Queue 的数量但还是发生了消息积压问题,这时候需要做的是 消息迁移Queue扩容,具体来讲就是修改消费者的逻辑,让消费者使用一个临时的 Topic,在这个 Topic 下建立设置更多的 Consumer,然后把原来的消息转发到临时的 Topic 上,通过临时的 Topic 的 Consumer 来消费这些堆积的消息

定时消息

定时消息是 RocketMQ 提供的一种高级消息特性,消息被发送到服务端之后,在指定时间后才能被消费者消费

SCHEDULE_TOPIC_XXXX 介绍

SCHEDULE_TOPIC_XXXX 是 RocketMQ 一个系统类型的 Topic,用于标识延时消息

这个 Topic 有 18 个队列,分别唯一对应着 RocketMQ 的 18 个延时等级,对应关系为:queueId = delayTimeLevel - 1

ScheduleMessageService 介绍

这是 Broker 中的一个延时服务,专门消费 Topic 为 SCHEDULE_TOPIC_XXXX 的延时消息,并将其投递到目标 Topic 中

ScheduleMessageService 在启动时,会创建一个定时器 Timer,并根据延迟级别的个数,启动对应数量的 TimerTask,每个 TimerTask 负责一个延迟级别的消费与投递

延时消息的生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态
  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达
  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除

延时消息在 Broker 的轮转流程

  1. Broker 把消息的 Topic 修改为 SCHEDULE_TOPIC_XXX,然后根据本次消息的延时等级计算需要投递到的具体队列。同时还需要把消息原来的 Topic 及其队列信息存储到消息的属性中,便于后面正确投递
  2. 在从 CommitLog 把消息转发到 Queue 的过程中,会计算这个延时消息什么时候进行投递,投递时间 = 消息存储时间 + 延时等级对应的时间
  3. 延时队列服务 ScheduleMessageService 消费这个消息
  4. 从消息属性中取出并设置原来消息的 Topic 和队列信息,存储到 CommitLog,此时这条消息已经完成延时,所以 ConsumeQueue 中的 Message Tag HashCode 需要重新计算消息 Tag 的哈希值再存储
  5. 由于消息的 Topic 已经修改为原来的 Topic,所以直接投递到对应的队列中
  6. 消费者消费这条消息

延时消息的示例代码如下

//定时/延时消息发送
MessageBuilder messageBuilder = new MessageBuilderImpl();;
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

事务消息

事务消息是 RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性

实现事务消息的核心

  • 两阶段提交:第一阶段 Producer 发送 Half 消息到 Broker 测试 RocketMQ 是否正常;Broker只有在收到第二阶段的消息为 Commit 时,Consumer 才能对消息进行消费
  • 事务补偿机制:当 Broker 收到状态为 unknown 的消息或由于网络波动、Producer 宕机导致长时间没有收到第二阶段的提交时,Broker 会调用生产者接口来回查本次事务的状态

事务消息处理流程

  1. 生产者将消息发送至 RocketMQ 服务端。
  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 ACK 确认消息已经发送成功,此时消息被标记为"暂不能投递",这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为 Commit :服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为 Rollback :服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为 Unknown 未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

事务消息生命周期

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端的状态。

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

事务消息的示例代码如下

//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}
//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilderImpl();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}

消息发送重试与消息流控

重试的基本概念

RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

同步发送和异步发送模式均支持消息发送重试。

重试的触发条件

触发消息发送重试机制的条件如下:

  • 客户端消息发送请求调用失败或请求超时
  • 网络异常造成连接失败或请求超时。
  • 服务端节点处于重启或下线等状态造成连接失败。
  • 服务端运行慢造成请求超时。
  • 服务端返回失败错误码
    • 系统逻辑错误:因运行逻辑不正确造成的错误。
    • 系统流控错误:因容量超限造成的流控错误。

对于事物消息,只会进行 透明重试(transparent retries),网络超时或异常等场景不会进行重试

重试流程

Producer 在初始化时设置消息发送最大重试次数,当触发了上述条件时,Producer 客户端会按照重试的最大次数一直重试发送消息,直到消息发送成功或已经达到最大重试次数,并在最后一次重试失败后返回调用错误响应

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或彻底重试失败并返回错误码、抛出异常
  • 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或成功事件返回

重试间隔

  • 除了服务端返回系统流控错误场景,其他触发条件触发重试之后,均会立即重试,无等待间隔
  • 若由于服务端返回了系统流控错误触发了重试,系统会按照 指数退避策略 进行延迟重试。指数退避算法通过以下参数控制重试行为:
    • INITIAL_BACKOFF:第一次失败重试前后需要等待多久,默认值为1s
    • MULTIPLIER:指数退避因子,即退避倍率,默认值为1.6
    • JITTER:随即抖动因子,默认值为0.2
    • MAX_BACKOFF:等待间隔时间上限,默认值为120s
    • MIN_CONNECT_TIMEOUT:最短重试间隔,默认值为20s

流控的基本概念

消息流控指的是系统容量或水位过高, RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

流控的触发条件

  • 存储压力大:参考 消费进度管理 的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。

消费者分类

RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者

PushConsumer

PushConsumers是一种高度封装的消费者类型,消费消息仅通过消费监听器处理业务并返回消费结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端SDK完成。

使用方式

PushConsumer的使用方式比较固定,在消费者初始化时注册一个消费监听器,并在消费监听器内部实现消息处理逻辑。由 RocketMQ 的SDK在后台完成消息获取、触发监听器调用以及进行消息重试处理。

示例代码如下:

// 消费示例:使用PushConsumer消费普通消息。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// 设置消费者分组。
.setConsumerGroup("YourConsumerGroup")
// 设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// 消费消息并返回处理结果。
return ConsumeResult.SUCCESS;
}
})
.build();

PushConsumer的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以Java SDK为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。
  • 返回消费失败:以Java SDK为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。
  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费。

PushConsumer 消费消息时,若消息处理逻辑出现预期之外的阻塞导致消息处理一直无法执行成功,SDK会按照消费超时处理强制提交消费失败结果,并按照消费重试逻辑进行处理。

内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。

SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

使用方式

SimpleConsumer 的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。

示例代码如下:

// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// 设置消费者分组。
.setConsumerGroup("YourConsumerGroup")
// 设置接入点。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置从服务端接受消息的最大等待时间
.setAwaitDuration(Duration.ofSeconds(1))
.build();
try {
// SimpleConsumer 需要主动获取消息,并处理。
List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消费处理完成后,需要主动调用 ACK 提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
}
});
} catch (ClientException e) {
// 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
logger.error("Failed to receive message", e);
}

消费重试

消费重试指的是,消费者在消费某条消息失败后,Apache RocketMQ 服务端会根据重试策略重新消费该消息,超过一定次数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中。

消息重试的触发条件

  • 消费失败,包括消费者返回消息失败状态标识或抛出非预期异常。
  • 消息处理超时,包括在PushConsumer中排队超时。

消息重试策略主要行为

  • 重试过程状态机:控制消息在重试流程中的状态和变化逻辑。
  • 重试间隔:上一次消费失败或超时后,下次重新尝试消费的间隔时间。
  • 最大重试次数:消息可被重试消费的最大次数。

消息重试策略差异

根据消费者类型不同,消息重试策略的具体内部机制和设置方法有所不同,具体差异如下:

消费者类型重试过程状态机重试间隔最大重试次数
PushConsumer已就绪 处理中 待重试 提交死信消费者分组创建时的元数据控制消费者分组创建时的元数据控制
SimpleConsumer已就绪 处理中 提交死信通过API修改获取消息时的不可见时间消费者分组创建时的元数据控制

PushConsumer 消费重试策略

  • Ready:已就绪状态。消息在 RocketMQ 服务端已就绪,可以被消费者消费。
  • Inflight:处理中状态。消息被 Consumer 客户端获取,处于消费中还未返回消费结果的状态。
  • WaitingRetry:待重试状态,PushConsumer 独有的状态。当 Consumer 消息处理失败或消费超时,会触发消费重试逻辑判断。如果当前重试次数未达到最大次数,则该消息变为待重试状态,经过重试间隔后,消息将重新变为已就绪状态可被重新消费。多次重试之间,可通过重试间隔进行延长,防止无效高频的失败。
  • Commit:提交状态。消费成功的状态,消费者返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。您可以通过消费死信队列的消息进行业务恢复。

消息重试过程中,每次重试消息状态都会经过已就绪>处理中>待重试的变化,两次消费间的间隔时间实际由消费耗时及重试间隔控制,消费耗时的最大上限受服务端系统参数控制,一般不应该超过上限时间。

PushConsumer 的最大重试次数由 Consumer 分组创建时的元数据控制

PushConsumer的消费重试示例代码如下

SimpleConsumer simpleConsumer = null;
//消费示例:使用PushConsumer消费普通消息,如果消费失败返回错误,即可触发重试。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//返回消费失败,会自动重试,直至到达最大重试次数。
return ConsumeResult.FAILURE;
}
};

SimpleConsumer 消费重试策略

  • Ready:已就绪状态。消息在 RocketMQ 服务端已就绪,可以被 Consumer 消费。
  • Inflight:处理中状态。消息被 Consumer 客户端获取,处于消费中还未返回消费结果的状态。
  • Commit:提交状态。消费成功的状态,Consumer 返回成功响应即可结束消息的状态机。
  • DLQ:死信状态。消费逻辑的最终兜底机制,若消息一直处理失败并不断进行重试,直到超过最大重试次数还未成功,此时消息不会再重试,会被投递至死信队列。

和 PushConsumer 消费重试策略不同的是,SimpleConsumer 消费者的重试间隔是预分配的,每次获取消息消费者会在调用 API 时设置一个不可见时间参数 InvisibleDuration,即消息的最大处理时长。若消息消费失败触发重试,不需要设置下一次重试的时间间隔,直接复用不可见时间参数的取值。

SimpleConsumer 的最大重试次数由 Consumer 分组创建时的元数据控制

SimpleConsumer的消费重试示例代码如下

//消费示例:使用SimpleConsumer消费普通消息,如果希望重试,只需要静默等待超时即可,服务端会自动重试。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//如果处理失败,希望服务端重试,只需要忽略即可,等待消息再次可见后即可重试获取。
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}