Kafka结构分析

分布式消息队列的优势是很好地对计算机系统进行了解耦,不同系统之间通过消息队列来交换消息。Kafka是分布式消息队列的代表作之一,很好的兼顾了性能和一致性,但代价就是使用起来更加复杂。我花了点时间阅读了Kafka 0.10版本的源码,本文不对繁杂的源码细节和实现做描述,只关注架构和思路。

1 主题 (topic)

主题是消息队列用来区分数据的逻辑概念,可以把不同类型的数据分开,让消费者只获取某个主题的数据。

2 分区 (partition)

知道主题之后,最直观的想法是,一个主题一条队列,这样就能做到只获取某一个主题的数据,这个想法是对的。但分布式的呢,可以想到一个主题多条队列,一条队列放在节点上,这样消费者就能充分发挥分布式IO效率获取该主题的数据。但是这样还不行,万一个节点挂了,存在该节点上的所有队列就丢失了。所以,每一条队列还要有副本,存放在不同的节点上,当某个节点挂了,消费者就可以从别的节点上的副本中找数据。而这条队列,就称为“分区”。

如上所说,每个分区的数据会维持多个副本分散在不同的节点上,其中有一个leader和若干个follower。写入数据时先写到leader中,随后leader将数据拷贝到followers上。如果某个follower挂了不会有影响,因为获取数据时优先从leader获取,kafka就再找一个节点作为该分区的新follower,然后让leader把数据拷贝过去。当leader挂了,就从followers上选出一个新leader。

3 生产者

生产者负责将用户输入的数据存入分区中。

3.1 发送方式

生产者客户端通过类似于 send(data, callback) 的函数发送输入数据,先经过自定义的拦截器做一些处理,下一步将数据序列化,通过哈希函数计算出该存到哪个分区上,然后将数据放入分区对应的缓冲区中。每个缓冲区可以存放多个batch,每个batch有一个内存块,内存块序列存储用户的输入数据。每次请求以batch为单位。输入数据先经过压缩器的压缩,然后插入到内存块中,插入后会记录一个元数据。如果输入数据超出内存块大小就新建一个batch,默认大小是16Mb。

元数据主要存储时间戳,crc,数据所在分区,在batch中的偏移offset,数据大小等参数,这个元数据不会发送出去,会作为用户回调函数的参数,用户可以获得这些参数,具体用途由用户自己决定。

每一个输入数据对应有一个thunk,我们把回调对象和元数据存入该对象中。当该batch发送成功并响应时,会遍历该batch下的所有thunk,调用每个thunk的回调方法。如果没有收到响应就重新发送。

我们开启一个IO线程,线程不断从缓冲区中获取batch。IO线程维护若干个客户端对象,把batch分给其中一个客户端,由它负责发起请求,发起请求可以是多线程的。每发起一个请求后就会把该请求缓存到一个等待响应队列中,当响应成功,就把请求从队列中去掉,我们也可以通过该队列判断客户端的负载情况。

3.2 集群元数据

客户端对象内部维护一个集群元数据,由此知道每个分区应该发送到哪个节点上,这些数据需要从zookeeper服务中获取集群元数据信息,那什么时候获取元数据呢?一个是定期请求,比如3分钟一次,另一个是发送数据时发生异常。一旦发起获取元数据请求,其他线程中的请求需要阻塞,直到元数据成功获取后再唤醒它们。

4 消费者

消费者负责从分区中获取数据。采用主动拉取的方式,通过类似于 poll() 的函数单线程轮询服务端并获得数据。生产者只需要把数据扔进分区中即可,而消费者不单纯把数据从分区中拿出来,它要考虑数据的一致性问题。

比如生产者生成了一个订单,扔进了分区。而消费者从该分区中拿出来,万一在处理过程中消费者挂掉了,那这个订单算怎么回事?你必须确保这个订单能够被正常处理。

4.1 一致性

总体来说,确保数据的一致性有两个方面,一个是确保数据不丢失,另一个是确保数据不重复。

这就是所谓的传递保证(Delivery guarantee semantic):

  • At most once: 消息可能会丢, 但绝不会重复传递。
  • At least once: 消息绝不会丢, 但可能会重复传递。
  • Exactly once: 每条消息只会被传递一次。

确保数据不丢失这个问题,我们可以通过偏移和提交来解决。高可用的消息队列一定会把数据存储起来,而不是单纯的将数据从一个地方传到另一个地方。因为我们使用了分区,所以可以由指定一个消费者去消费某一个分区的数据。而消费者根据偏移每次获取一份数据,处理完后就提交这个偏移。万一中途挂了,因为偏移没有被提交,所以仍然认为这个数据没有被处理。

对于重复的问题,生产者为了确保数据已安全放入分区,需要等一个响应。当没有等到响应时怎么办呢?如果数据发送过程出错,那就重新发送即可。但也有可能数据已经放入分区了,但响应包丢失了,这时生产者重新发送就会造成数据重复。一个比较好的解决办法是,每个消息一个全局唯一ID,如果消费者发现ID重复,就忽略掉。

4.2 分区分配策略

每个消费者会被分配到一个或多个分区,但可能随时加入新的消费者,或移除旧的消费者,不管如何,你要确保所有分区都有消费者在消费。也就是说,当加入新消费者或移除旧消费者,你要为所有消费者重新分配新的分区,这一行为称为再平衡(Rebalance)。

我们把对监听同一个主题的消费者看成是一个组(Consumer Group),我们需要把该主题下的所有分区分配给该组消费者。

4.2.1 Rebalance

在讲怎么分配之前,我们需要先知道Rebalance的过程。

一般操作是,服务端维护了名为GroupCoordinator的中心控制台,当新开启一个消费者,消费者会获取一个GroupCoordinator,消费者会发送消息到中心控制台。中心控制台根据该消费者监听的主题从对应的组里选出一个Leader,然后给该组所有成员发送消息,告诉它们现在有新成员加入要重新分配任务,都把手动上的事情停一停。期间,只有Leader能知道该组所有成员的信息,Leader计算出分区分配结果,然后发给中心控制台,中心控制台再发给该组的所有成员,成员们拿到分区信息后就能开始工作了。

一个消费者同时监听多个主题的情况也是同样适用的,相当于一个消费者同时属于多个组而已。

4.2.2 分配方法

分配方法是可以在启动消费者时自定义的,但使用内置方法也是可以的。

  • 方法一,RangeAssignor:针对每个Topic,n=分区数/消费者数量, m=分区数%消费者数量,前m个消费者每个分配n+1个分区,后面的(消费者数量-m)个消费者每个分配n个Partition。
  • 方法二,RoundRobinAssignor:将所有Topic的Partition按照字典序排列, 然后对每个Consumer进行轮询分配。

举个例子,有C0、C1两个消费者和t0、t1两个Topic,每个Topic有三个分区编号都是0~2。使用RangeAssignor的分配结果是:C0: [t0p0, t0p1, t1p0, t1p1], C1: [t0p2, t1p2];使用RoundRobinAssignor的分配结果是:C0: [t0p0, t0p2, t1p1]、C1: [t0p1, t1p0, t1p2]。

4.3 心跳

服务端是多节点的,GroupCoordinator可能处于不同节点上,当节点挂掉,节点上的GroupCoordinator也就挂掉了。所以要确保在节点挂掉后,消费者能找到新的GroupCoordinator。这时就需要心跳协议,消费者定时向GroupCoordinator发送心跳,如果响应超时则认为该GroupCoordinator挂掉了,消费者需要再次获取一个GroupCoordinator。

反过来,如果GroupCoordinator长时间没有收到消费者的心跳,它也会认为消费者挂掉了,就会触发重新分配(Rebalance)。

5 服务端

服务端负责接收和存储生产者发送的消息,以及将数据提供给消费者。因为是多节点的,我们把每个节点称为Broker。

5.1 存储

在分布式领域中,我们一般把消息或数据看成日志,以日志的方式存储。一个Topic可以有多个分区,每个分区使用偏移量offest来标识一份数据。

part-offest

物理存储结构是以<topic_name>-<partition_id>的目录存储,目录下保护一个分区的日志文件.log,日志文件可以分成多个,以该文件中第一份数据的偏移量作为文件名,比如0003458.log

实际存储时,并不是来一条消息就马上写入文件中,而是尝试将多条消息合并压缩后再写入文件,这样可以节省存储空间。压缩后的结构将变为内层消息和外层消息,内层消息是真实的单条消息,外层消息是内层消息的集合。外层消息的偏移量以最后一条内层消息为准,同时维护一个内部偏移量。

compress

为了提高查询效率,每一个日志文件附带一个索引文件0003458.index。索引文件存储消息的位置和偏移量的对应关系,已知偏移量,可以通过索引文件快速知道该消息在log文件中的位置。为了节省空间,索引文件使用了稀疏存储的方式。

index

通过对索引文件记录的二分查找,可以快速找到消息所在位置的附近,然后顺序查找一下即可。

一个log和它对应的index文件在程序中看作一个LogSegment。一个分区的所有LogSegment以跳表形式存储。

5.2 副本

每个分区维护多个副本,并且会从其副本集合中选出一个副本作为Leader副本,所有的读写请求都由选举出的Leader副本处理。剩余的其他副本都作为Follower副本,Follower副本会从Leader副本处获取消息并更新到自己的Log中。 一般情况下,同一分区的多个副本会被均匀地分配到集群中的不同Broker上,当Leader副本的所在的Broker出现故障后,可以重新选举新的Leader副本继续对外提供服务。

涉及到选举,那么就必须关心谁有资格参与选举。副本的同步涉及到多个节点,从Leader发往Followers,总有一些Followers同步速度跟不上,只有跟上Leader同步进度的Followers才有资格参与选举。所有副本统称为AR(Assigned Repllicas),保持同步的副本称为ISR(In-Sync Replicas),同步滞后的副本称为OSR(Out-Sync Relipcas),能够参与选举的必须是该分区处于ISR的节点。

5.3 Broker Leader

因为存在多个Broker,为了方便管理,需要选出一个Leader来管理其他所有Broker。

不使用Leader的话,每个Broker都各自从ZooKeeper中获取元数据,而ZooKeeper本身的数据也在不断更新,这种情况下,不同Broker发起请求的时间差会导致可能获得到不同的数据,出现“脑裂”,“羊群效应”等问题。

使用Leader后,只有Leader能与ZooKeeper交互,Followers发生了什么问题统一向Leader请求指示,比如某分区Leader副本故障,或新增了消费者等。当然,要是Leader挂了,同样要从Followers中选举出新的Leader。

无锁环形缓冲 Raft分布式系统构建方法论

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×