消息队列
kafka与rocketMQ
Kafka
如果现在有两个服务A和B,A服务每秒发送200个消息,B服务每秒消费100个消息,那这样用不了多久服务B就会被压垮,那有没有什么办法能在不降低A发送速度的前提下让服务B不被压垮呢?
可以在B服务内加一个队列或者是链表,服务A的消息会加入到内存中,服务B按照自己的消费能力从队列中取消息来消费,然后每个消息都有一个序号,我们叫他offset,用来标记消息消费的进度。这样B 服务依据自己的处理能力,消费链表里的消息。能处理多少是多少,不断更新已处理 Offset 的值。
但这样有一个问题:来不及处理的消息会堆积在内存里,如果服务b宕机或者是重启了,那队列里没消费的消息就丢了,解法也很简单,把这个队列从服务B中抽出来,作为单独的一个进程,这样服务B宕机也不会影响队列里的消息。
这样的一个结构其实就是简单的消息队列,A服务负责向队列中放消息,他就是Producer,服务B负责处理消息就是Consumer。
但这个结构就过于简陋的,高扩展,高可用,高性能一个没有。
高性能
由于服务B的消费能力比较弱,这样消息队列里会不断堆积数据,为了提升性能,我们可以扩展更多的消费者, 这样消费速度就上去了,相对的我们就可以增加更多生产者,提升消息队列的吞吐量。
随着生产者和消费者都变多,我们会发现它们会同时争抢同一个消息队列,抢不到的一方就得等待,这不纯纯浪费时间吗!
解决方案就是对消息进行分类,每一类是一个 topic,然后根据 topic 新增队列的数量,生产者将数据按 topic 投递到不同的队列中,消费者则根据需要订阅不同的 topic。这就大大降低了 topic 队列的压力。
但单个 topic 的消息还是可能过多,我们可以将单个队列,拆成好几段,每段就是一个partition 分区,每个消费者负责一个 partition。这就大大降低了争抢,提升了消息队列的性能。
高扩展
随着 partition 变多,如果 partition 都在同一台机器上的话,就会导致单机 cpu 和内存过高,影响整体系统性能
于是就可以申请更多的机器,将 partition 分散部署在多台机器上,这每一台机器,就代表一个 broker。我们可以通过增加 broker 缓解机器 cpu 过高带来的性能问题。
高可用
到这里,其实还有个问题,如果其中一个 partition 所在的 broker 挂了,那 broker 里所有 partition 的消息就都没了。这高可用还从何谈起?
我们可以给 partition 多加几个副本,也就是 replicas,将它们分为 Leader 和 Follower。Leader 负责应付生产者和消费者的读写请求,而 Follower 只管同步 Leader 的消息。
将 Leader 和 Follower 分散到不同的 broker 上,这样 Leader 所在的 broker 挂了,也不会影响到 Follower 所在的 broker, 并且还能从 Follower 中选举出一个新的 Leader partition 顶上。这样就保证了消息队列的高可用。
持久化和过期策略
刚刚提到的是几个 broker 挂掉的情况,那搞大点,假设所有 broker 都挂了,那岂不是数据全丢了?
为了解决这个问题,我们不能光把数据放内存里,还要持久化到磁盘中,这样哪怕全部 broker 都挂了,数据也不会全丢,重启服务后,也能从磁盘里读出数据,继续工作。
但问题又来了,磁盘总是有限的,这一直往里写数据迟早有一天得炸。所以我们还可以给数据加上保留策略,也就是所谓的 retention policy,比如磁盘数据超过一定大小或消息放置超过一定时间就会被清理掉。
Consumer Group
到这里,这个消息队列好像就挺完美了。但其实还有个问题,按现在的消费方式,每次新增的消费者只能跟着最新的消费 Offset 接着消费。如果我想让新增的消费者从某个 Offset 开始消费呢?听起来这个需求很刁钻?举个例子你明白了。
哪怕 B 服务有多个实例,但本质上,它只有一个消费业务方,新增实例一般也是接着之前的 offset 继续消费。 假设现在来了个新的业务方,C 服务,它想从头开始消费消息队列里的数据,这时候就不能跟在 B 服务的 offset 后边继续消费了。
所以我们还可以给消息队列加入消费者组(consumer group)的概念,B 和 C 服务各自是一个独立的消费者组,不同消费者组维护自己的消费进度,互不打搅。
ZooKeeper(现已改为KRaft)
但现在还有一个问题,随着我们解决了很多问题,引入了很多组件,什么producer,broker,partition,consumer等,导致现在组件太多了,而且每个组件都有自己的数据和状态,所以还需要有个组件去统一维护这些组件的状态信息,于是我们引入 ZooKeeper 组件。它会定期和 broker 通信,获取 整个 kafka 集群的状态,以此判断 某些 broker 是不是跪了,某些消费组消费到哪了。
RocketMQ
RocketMQ其实和其他消息队列一样,它接受来自生产者的消息,将消息分类,每一类是一个 topic,消费者根据需要订阅 topic,获取里面的消息。那他和Kafka之间有什么区别呢?
总的来说"和 Kafka 相比,RocketMQ 在架构上做了减法,在功能上做了加法"
在架构上做减法
首先回忆一下kafka的架构特点
- 通过多个
topic对消息进行分类。 - 为了提升单个 topic 的并发性能,将单个 topic 拆为多个
partition。 - 为了提升系统扩展性,将多个 partition 分别部署在不同
broker上。 - 为了提升系统的可用性,为 partition 加了多个副本。
- 为了协调和管理 Kafka 集群的数据信息,引入
Zookeeper作为协调节点。
Zookeeper 在 Kafka 架构中会和 broker 通信,维护 Kafka 集群信息。一个新的 broker 连上 Zookeeper 后,其他 broker 就能立马感知到它的加入,像这种能在分布式环境下,让多个实例同时获取到同一份信息的服务,就是所谓的分布式协调服务。
但 Zookeeper 作为一个通用的分布式协调服务,它不仅可以用于服务注册与发现,还可以用于分布式锁、配置管理等场景。 Kafka 其实只用到了它的部分功能,多少有点杀鸡用牛刀的味道。太重了。
所以 RocketMQ 直接将 Zookeeper 去掉,换成了 nameserver,用一种更轻量的方式,管理消息队列的集群信息。生产者通过 nameserver 获取到 topic 和 broker 的路由信息,然后再与 broker 通信,实现服务发现和负载均衡的效果。
在kafka中会将topic拆分成多个partition,而在RocketMQ里叫Queue。
Kafka 中的 partition 会存储完整的消息体,而 RocketMQ 的 Queue 上却只存一些简要信息,比如消息偏移 offset,而消息的完整数据则放到"一个"叫 commitlog 的文件上,通过 offset 我们可以定位到 commitlog 上的某条消息。
Kafka 消费消息,broker 只需要直接从 partition 读取消息返回就好,也就是读第一次就够了。而在 RocketMQ 中,broker 则需要先从 Queue 上读取到 offset 的值,再跑到 commitlog 上将完整数据读出来,也就是需要读两次。
那这里的问题就是明明看起来 Kafka 的设计更高效?为什么 RocketMQ 不采用 Kafka 的设计?
这就得聊到kafka的底层存储,Kafka 的 partition 分区,其实在底层由很多段(segment)组成,每个 segment 可以认为就是个小文件。将消息数据写入到 partition 分区,本质上就是将数据写入到某个 segment 文件下。
我们知道,操作系统的机械磁盘,顺序写的性能会比随机写快很多,差距高达几十倍。为了提升性能,Kafka 对每个小文件都是顺序写。如果只有一个 segment 文件,那写文件的性能会很好。但当 topic 变多之后,topic 底下的 partition 分区也会变多,对应的 partition 底下的 segment 文件也会变多。同时写多个 topic 底下的 partition,就是同时写多个文件,虽然每个文件内部都是顺序写,但多个文件存放在磁盘的不同地方,原本顺序写磁盘就可能劣化变成了随机写。于是写性能就降低了。
而RocketMQ为了缓解同时写多个文件带来的随机写问题,索性将单个 broker 底下的多个 topic 数据,全都写到"一个"逻辑文件 CommitLog 上,这就消除了随机写多文件的问题,将所有写操作都变成了顺序写。大大提升了 RocketMQ 在多 topic 场景下的写性能。
注意上面提到的"一个"是带引号的,虽然逻辑上它是一个大文件,但实际上这个 CommitLog 由多个小文件组成。每个文件的大小是固定的,当一个文件被写满后,会创建一个新的文件来继续存储新的消息。这种方式可以方便地管理和清理旧的消息。
简化备份模型
我们知道Kafka 会将 partiton 分散到多个 broker 中,并为 partiton 配置副本,将 partiton 分为 leader和 follower,也就是主和从。broker 中既可能有 A topic 的主 partiton,也可能有 B topic 的从 partiton。主从 partiton 之间会建立数据同步,本质上就是同步 partiton 底下的 segment 文件数据
RocketMQ 将 broker 上的所有 topic 数据到写到 CommitLog 上。如果还像 Kafka 那样给每个分区单独建立同步通信,就还得将 CommitLog 里的内容拆开,这就还是退化为随机读了。于是 RocketMQ 索性以 broker 为单位区分主从,主从之间同步 CommitLog 文件,保持高可用的同时,也大大简化了备份模型。
在功能上做加法
我们知道,Kafka 支持通过 topic 将数据进行分类,比如订单数据和用户数据是两个不同的 topic,但如果我还想再进一步分类呢?比如同样是用户数据,还能根据 vip 等级进一步分类。假设我们只需要获取 vip6 的用户数据,在 Kafka 里,消费者需要消费 topic 为用户数据的所有消息,再将 vip6 的用户过滤出来。
而 RocketMQ 支持对消息打上标记,也就是打 tag,消费者能根据 tag 过滤所需要的数据。比如我们可以在部分消息上标记 tag=vip6,这样消费者就能只获取这部分数据,省下了消费者过滤数据时的资源消耗。
相当于 RocketMQ 除了支持通过 topic 进行一级分类,还支持通过 tag 进行二级分类。
我们知道 Kafka 支持事务,比如生产者发三条消息 ABC,这三条消息要么同时发送成功,要么同时发送失败。但这个事务似乎跟我们要的不太一样。
写业务代码的时候,我们更想要的事务是,"执行一些自定义逻辑"和"生产者发消息"这两件事,要么同时成功,要么同时失败。而这正是 RocketMQ 支持的事务能力。
加入延时队列
如果我们希望消息投递出去之后,消费者不能立马消费到,而是过个一定时间后才消费,也就是所谓的延时消息,就像文章开头的定时外卖那样。如果我们使用 Kafka, 要实现类似的功能的话,就会很费劲。 但 RocketMQ 天然支持延时队列,我们可以很方便实现这一功能。
加入死信队列
消费消息是有可能失败的,失败后一般可以设置重试。如果多次重试失败,RocketMQ 会将消息放到一个专门的队列,方便我们后面单独处理。这种专门存放失败消息的队列,就是死信队列。Kafka 原生不支持这个功能,需要我们自己实现。
消息回溯
Kafka 支持通过调整 offset 来让消费者从某个地方开始消费,而 RocketMQ,除了可以调整 offset, 还支持调整时间。
所以不那么严谨的说, RocketMQ 本质就是在架构上做了减法,在功能上做了加法的 Kafka。
RocketMQ的性能为什么不如Kafka
在介绍他们之间的性能差异前需要先知道什么是零拷贝。
我们知道,消息队列的消息为了防止进程崩溃后丢失,一般不会放内存里,而是放磁盘上。 那么问题就来了,消息从消息队列的磁盘,发送到消费者,过程是怎么样的呢?
操作系统分为用户空间和内核空间。程序处于用户空间,而磁盘属于硬件,操作系统本质上是程序和硬件设备的一个中间层。程序需要通过操作系统去调用硬件能力。
如果用户想要将数据从磁盘发送到网络。那么就会发生下面这几件事:
程序会发起系统调用read(),尝试读取磁盘数据,
- 磁盘数据从设备拷贝到内核空间的缓冲区。
- 再从内核空间的缓冲区拷贝到用户空间。
程序再发起系统调用write(),将读到的数据发到网络:
- 数据从用户空间拷贝到 socket 发送缓冲区
- 再从 socket 发送缓冲区拷贝到网卡。
整个过程,本机内发生了 2 次系统调用,对应 4 次用户空间和内核空间的切换,以及 4 次数据拷贝。
常见的零拷贝技术就是mmap和sendfile
mmap
mmap 是操作系统内核提供的一个方法,可以将内核空间的缓冲区映射到用户空间。
用了它,整个发送流程就有了一些变化。
程序发起系统调用mmap(),尝试读取磁盘数据,具体情况如下:
- 磁盘数据从设备拷贝到内核空间的缓冲区。
- 内核空间的缓冲区映射到用户空间,这里不需要拷贝。
程序再发起系统调用write(),将读到的数据发到网络:
- 数据从内核空间缓冲区拷贝到 socket 发送缓冲区。
- 再从 socket 发送缓冲区拷贝到网卡。
整个过程,发生了 2 次系统调用,对应 4 次用户空间和内核空间的切换,以及 3 次数据拷贝,对比之前,省下一次内核空间到用户空间的拷贝。
因此mmap 作为一种零拷贝技术,指的是用户空间到内核空间这个过程不需要拷贝,而不是指数据从磁盘到发送到网卡这个过程零拷贝。
sendfile
sendfile,也是内核提供的一个方法,从名字可以看出,就是用来发送文件数据的。
程序发起系统调用sendfile(),内核会尝试读取磁盘数据然后发送,具体情况如下:
- 磁盘数据从设备拷贝到内核空间的缓冲区。
- 内核空间缓冲区里的数据可以直接拷贝到网卡。
整个过程,发生了 1 次系统调用,对应 2 次用户空间和内核空间的切换,以及 2 次数据拷贝。
sendfile作为零拷贝技术,指的就是无cpu拷贝,整个过程参与拷贝的是硬件设备,不耽误cpu执行程序
那kafka为什么更快?
这是因为 RocketMQ 使用的是 mmap 零拷贝技术,而 kafka 使用的是 sendfile。kafka 以更少的拷贝次数以及系统内核切换次数,获得了更高的性能。
但问题又来了,为什么 RocketMQ 不使用 sendfile?参考 kafka 抄个作业也不难啊?
我们来看下 sendfile 函数长啥样。
ssize_t sendfile(int out_fd, int in_fd, off_t* offset, size_t count);
// num = sendfile(xxx);
再来看下 mmap 函数长啥样。
void *mmap(void *addr, size_t length, int prot, int flags,
int fd, off_t offset);
// buf = mmap(xxx)
注释里写的是两个函数的用法,可以看到
mmap返回的是数据的具体内容,应用层能获取到消息内容并进行一些逻辑处理。- 而
sendfile返回的则是发送成功了几个字节数,具体发了什么内容,应用层根本不知道。
而 RocketMQ 的一些功能,需要了解到具体这个消息内容,方便二次投递等,比如将消费失败的消息重新投递到死信队列中,如果 RocketMQ 使用 sendfile,那根本没机会获取到消息内容长什么样子,也就没办法实现一些好用的功能了。
而由于 kafka 没有这些功能特性,追求极致性能,正好可以使用 sendfile。
除了零拷贝以外,kafka 高性能的原因还有很多,比如什么批处理,数据压缩啥的,但那些优化手段 rocketMQ 也都能借鉴,唯独这个零拷贝,那是毫无办法。
因此没有一种架构是完美的,一种架构往往用于适配某些场景,你很难做到既要又要还要。当场景不同,我们就需要做一些定制化改造,通过牺牲一部分能力去换取另一部分能力。做架构,做到最后都是在做折中。