Backend

消息重复-有序-堆积-不丢失

消息重复-有序-堆积-不丢失

KafkaRocketMQRabbitMQ

img

如何处理消息重复消费

首先,处理消息重复消费的办法,就是确保幂等性。所谓幂等就是指一个操作无论执行多少次,产生的结果都是一样的。例如,HTTP 的 PUT 方法是幂等的,因为无论对同一资源执行多少次 PUT 操作,结果都是相同的。

为什么只能保证幂等性才能处理重复消费的问题呢?

从生产者角度

首先从生产者的角度来看,一般我们为了消息的可靠性(也就是确保消息消费成功),那么生产者发送消息后,就需要等待broker的响应。broker会在消费者成功消费消息后,并响应给broker的时候告诉生产者消息消费成功。

但如果响应由于网络原因生产者没有收到,生产者不知道究竟是消息消费失败还是响应延迟,但为了确保可靠性,生产者又重发了一次,此时消息就重复了。

从消费者角度

一个消费者拿到消息并消费成功后,就在要给broker响应的时候挂掉了,那么此时消息消费的offset就没来得及修改,此时另一个消费者顶替他,就会重复消费这个消息

总结以及方法

所以从上述分析我们可以知道,我们无法从生产者和消费者方面处理重复消费的问题。

所以我们只能确保消息的幂等性,也就是同一个消息无论消费多少次结果都一样。

那么大致就有两种方案

基于唯一标识去重

比如给每条消息分配全局唯一id,比如uuid或者是雪花算法id。并将消费的id临时存储在redis或mysql中,但消费者消费消息时先查询数据库或redis是否已经有这个消息的记录,若已处理则直接返回,未处理则执行逻辑并记录 ID。

利用业务本身的幂等性

比如在一些订单的场景下,可以天然利用订单号或者是订单状态之类的来防止重复消费。比如在这些任务通常在执行前会判断订单号是否存在,订单状态是否符合预期等。在一定程度上也能保证幂等


如何保证消息的有序性

保证消息有序性的常见方法如下:

单一生产者和单一消费者

  • 使用单个生产者发送消息到单个队列,并由单个消费者处理消息。这样可以确保消息按照生产者的发送顺序消费。
  • 这种方法简单但容易成为性能瓶颈,无法充分利用并发的优势。

分区与顺序键(Partition Key)

  • 在支持分区(Partition) 的消息队列中(如 Kafka、RocketMQ),可以通过 Partition Key 将消息发送到特定的分区。每个分区内部是有序的,这样可以保证相同 Partition Key 的消息按顺序消费。
  • 例如,在订单处理系统中,可以使用订单号作为 Partition Key,将同一个订单的所有消息路由到同一个分区,确保该订单的消息顺序。

顺序队列(Ordered Queue)

  • 一些消息队列系统(如 RabbitMQ)支持顺序队列,消息在队列中的存储顺序与投递顺序一致。如果使用单个顺序队列,消息将按顺序被消费。
  • 可以使用多个顺序队列来提高并发处理能力,并使用特定规则将消息分配到不同的顺序队列中。

Kafka

Kafka 中的消息在分区内部是有序的。生产者在发送消息时,可以指定分区(Partition)。如果所有相同 Key 的消息都发送到同一个分区,则可以保证这些消息的顺序。

  • 通过配置生产者的hash函数,可以将同一类型的消息发送到相同的分区,保证顺序。
  • 在消费端,使用单线程消费者从特定分区读取消息,可以确保消费的顺序性。

RocketMq

RocketMQ 支持顺序消息(Ordered Messages),生产者可以使用 send 方法将消息发送到指定的分区队列,并使用Message Queue Selector来选择目标队列(本质的实现和 kafka 是一样的)。

  • 消费者端通过顺序消费模式,可以从同一个消息队列中按顺序读取消息,确保消息的顺序性。
  • 通过指定 Message Queue 的选择算法(如按订单 ID 对队列数取模),可以将同一订单的所有消息投递到相同的队列中。

RabbitMQ

通过单个队列可以保证消息的顺序,如果消息需要并发消费,则需要将其路由到不同的顺序队列中。

  • 使用Message Grouping技术,将具有相同属性的消息分组到一个队列中,以确保组内消息的顺序。
  • 通过自定义路由策略,可以将同一业务逻辑的消息发送到相同的队列,从而保证顺序。

但在某些场景下,单个队列和单个消费者的方式可能会因为性能问题无法满足业务需求。我们可以通过多队列的方式,将不同业务 Key 的消息发送到不同的队列中,以提高并发处理能力,同时保证每个 Key 的消息顺序。

  • 将消息分配到不同的队列:例如,可以使用订单 ID 的哈希值,将订单消息发送到多个不同的队列。
  • 每个队列使用一个消费者:确保同一个队列中的消息按顺序被消费。每个消费者只负责处理一个队列中的消息。
  • 多队列方案:生产者根据业务 Key(如订单 ID)的哈希值,将消息发送到不同的队列(如 order_queue_0order_queue_1order_queue_2)。这种方法可以提高并发处理能力,因为每个队列可以有一个独立的消费者。
  • 每个消费者处理一个队列:消费者从指定的队列中消费消息,确保每个队列内的消息按顺序消费。

如何处理消息堆积

首先,消息堆积无非就是两种情况讨论

  1. 生产者生产消息过快
  2. 消费者消费消息过慢

对于第一种情况

生产者生产消息过快,需要注意以下几点

  1. 是否有接口限流?还是说出了bug导致疯狂发消息
  2. 大促活动没做好预估,流量比预期高好几倍

那么相对应的解决方案就是

  1. 添加接口限流,避免流量瞬间涌入
  2. 流浪高峰期做好消息发送速率的限制

对于第二种情况

首先应该观察:

  1. 消费者是否实例是否太少?
  2. 消费者线程是否太少?
  3. 业务逻辑是否调用大量第三方接口?
  4. 是否有大量与业务无关的操作占用了业务线程的时间?比如日志记录,发通知等

处理的手段则是:

  1. 增加消费者实例
  2. 增加消费者线程
  3. 通过缓存,减少查询数据库的此时
  4. 将日志,发通知这种与主业务无关的操作,用异步来处理,避免阻塞主业务线程

降级应对策略

但生产环境真的遇到消息堆积时,需要先采取一些紧急的降级策略,比如:

  • 临时扩展多个消费者队列,kafka中可以临时增加分区数,rocketmq增加队列数,RabbitMQ 可以将消息按照特定规则路由到多个队列中。这样可以在消息堆积时,将不同类型的消息分开处理。
  • 如果挤压严重,可以临时跳过丢弃一些不重要的如日志记录,发送通知的消息
  • 将消息临时批量写入文件/数据库中。

根本上解决问题

临时处理完后,需要从根本上解决问题,不然改天又会发生同样的情况

  1. 单线程消费改成多线程消费
  2. 增加消费者实例
  3. 与业务无关的操作异步化
  4. 使用缓存来减少数据库的查询
  5. 上游做好流量控制

如何保证消息不丢失

这需要生产消息、存储消息和消费消息三个阶段共同努力才能保证消息不丢失。

  • 生产者的消息确认:生产者在发送消息时,需要通过消息确认机制来确保消息成功到达。
  • 存储消息:broker 收到消息后,需要将消息持久化到磁盘上,避免消息因内存丢失。即使消息队列服务器重启或宕机,也可以从磁盘中恢复消息。
  • 消费者的消息确认:消费者在处理完消息后,再向消息队列发送确认(ACK),如果消费者未发送确认,消息队列需要重新投递该消息。

除此之外,如果消费者持续消费失败,消息队列可以自动进行重试或将消息发送到死信队列(DLQ)或通过日志等其他手段记录异常的消息,避免因一时的异常导致消息丢失。

保证消息不丢失需要三方都确认才行。

  1. 生产者发送消息,如果broker未响应或者说报错,生产者应该有重试机制,确保消息成功投送
  2. broker收到消息后应该及时给生产者响应,让生产者知道消息已经抵达broker自己的任务完成了
  3. 消费者收到broker的消息后,不能马上响应ACK,而是应该等到消息消费成功后才返回ACK
  4. 如果消费者一定时间未响应ACK,broker需要重新投送消息,确保消费者响应ACK。才算这个消息消费成功

post.comments