Kafka设计及其原理(二)

Kafka核心特性以及相关设计

Posted by LANY on February 23, 2019

Kafka核心特性

压缩

Kafka支持以集合为单位发送信息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩后,在Consumer端需对消息进行解压,压缩的好处就是减少传输的数据量,并减少网络传输的压力,在大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会消耗部分CPU资源)。

更详细的文章: Kafka Compression

消息可靠性

Kafka提供了多种可能对的消息传递保证,如下:

  • At most once :一条消息可能丢失了但是却只发送了一次
  • At least once :一条消息没有丢失但是却发送了多次
  • Exactly once :一条消息没有丢失却只发送了一次

值得注意的是,这会被分解成两个问题:发布消息的持久保证以及消费消息时的保证。

许多消息系统声称提供exactly once传递,但是阅读细则更重要,大多的声称exactly once会有一个误导(比如,他们不能解释一些现象例如消费者消费数据或者生产者生产数据时会有失败的情况,或者当出现多个消费程序的时候,又或者当数据写入到磁盘时被丢失)。

Kafka处理消息传递的方式:

Producer端看:当一个消息被成Producer发送后,Producer会等待Kafka Broker成功接收到消息后的反馈(可通过参数调节等待时间),如果消息在途中丢失或者Broker宕掉,那么Producer会重新发送该消息(可通过参数控制等待节点收到消息的节点数量)。

Consumer端看:Kafka Broker中记录了每条消息的offset值,这个值指向Consumer即将消费的下一个消息的offset值。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过该offset值找到上一条消息再次进行处理。Consumer还有权限控制offset值,对持久话到broker端的消息做任意处理。

备份机制

当有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群的工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,只有一个节点作为leader节点,该leader节点保存了其他备份节点列表,并维持各个备份间的状态同步。

Kafka备份机制

Kafka高效性相关设计

消息的持久化

Kafka高度依赖文件系统来存储和缓存消息,一般的人认为磁盘是缓慢的,这导致人们对持久化结构具有竞争性持怀疑态度。其实,磁盘远比你想象的要快或者慢,这决定于我们如何使用磁盘。 一个和磁盘性能有关的关键事实是:磁盘驱动器的吞吐量跟寻到延迟是相背离的,也就是所,线性写的速度远远大于随机写。比如:在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是600M/秒,但是随机写的速度只有100K/秒,两者相差将近6000倍。线性读写在大多数应用场景下是可以预测的,因此,操作系统利用read-ahead和write-behind技术来从大的数据块中预取数据,或者将多个逻辑上的写操作组合成一个大写物理写操作中。更多的讨论可以在ACMQueueArtical中找到,他们发现,对磁盘的线性读在有些情况下可以比内存的随机访问要快一些。 为了补偿这个性能上的分歧,现代操作系统都会把空闲的内存用作磁盘缓存,尽管在内存回收的时候会有一点性能上的代价。所有的磁盘读写操作会在这个统一的缓存上进行。 此外,如果我们是在JVM的基础上构建的,熟悉java内存应用管理的人应该清楚以下两件事情:

1.一个对象的内存消耗是非常高的,经常是所存数据的两倍或者更多。
2.随着堆内数据的增多,Java的垃圾回收会变得非常昂贵。

基于这些事实,利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好——我们至少要使得可用的缓存加倍,通过自动访问可用内存,并且通过存储更紧凑的字节结构而不是一个对象,这将有可能再次加倍。这么做的结果就是在一台32GB的机器上,如果不考虑GC惩罚,将最多有28-30GB的缓存。此外,这些缓存将会一直存在即使服务重启,然而进程内缓存需要在内存中重构(10GB缓存需要花费10分钟)或者它需要一个完全冷缓存启动(非常差的初始化性能)。它同时也简化了代码,因为现在所有的维护缓存和文件系统之间内聚的逻辑都在操作系统内部了,这使得这样做比one-off in-process attempts更加高效与准确。如果你的磁盘应用更加倾向于顺序读取,那么read-ahead在每次磁盘读取中实际上获取到这人缓存中的有用数据。 以上这些建议了一个简单的设计:不同于维护尽可能多的内存缓存并且在需要的时候刷新到文件系统中,我们换一种思路。所有的数据不需要调用刷新程序,而是立刻将它写到一个持久化的日志中。事实上,这仅仅意味着,数据将被传输到内核页缓存中并稍后被刷新。我们可以增加一个配置项以让系统的用户来控制数据在什么时候被刷新到物理硬盘上。

常数时间性能保证

消息系统中持久化数据结构的设计通常是维护者一个和消费队列有关的B树或者其它能够随机存取结构的元数据信息。B树是一个很好的结构,可以用在事务型与非事务型的语义中。但是它需要一个很高的花费,尽管B树的操作需要O(logN)。通常情况下,这被认为与常数时间等价,但这对磁盘操作来说是不对的。磁盘寻道一次需要10ms,并且一次只能寻一个,因此并行化是受限的。 直觉上来讲,一个持久化的队列可以构建在对一个文件的读和追加上,就像一般情况下的日志解决方案。尽管和B树相比,这种结构不能支持丰富的语义,但是它有一个优点,所有的操作都是常数时间,并且读写之间不会相互阻塞。这种设计具有极大的性能优势:最终系统性能和数据大小完全无关,服务器可以充分利用廉价的硬盘来提供高效的消息服务。事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着我们可以提供一般消息系统无法提供的特性。比如说,消息被消费后不是立马被删除,我们可以将这些消息保留一段相对比较长的时间(比如一个星期)。

进一步提高效率

我们已经为效率做了非常多的努力。但是有一种非常主要的应用场景是:处理Web活动数据,它的特点是数据量非常大,每一次的网页浏览都会产生大量的写操作。更进一步,我们假设每一个被发布的消息都会被至少一个consumer消费,因此我们更要怒路让消费变得更廉价。 通过上面的介绍,我们已经解决了磁盘方面的效率问题,除此之外,在此类系统中还有两类比较低效的场景:

  • 太多小的I/O操作
  • 过多的字节拷贝

为了减少大量小I/O操作的问题,kafka的协议是围绕消息集合构建的。Producer一次网络请求可以发送一个消息集合,而不是每一次只发一条消息。在server端是以消息块的形式追加消息到log中的,consumer在查询的时候也是一次查询大量的线性数据块。消息集合即MessageSet,实现本身是一个非常简单的API,它将一个字节数组或者文件进行打包。所以对消息的处理,这里没有分开的序列化和反序列化的上步骤,消息的字段可以按需反序列化(如果没有需要,可以不用反序列化)。

另一个影响效率的问题就是字节拷贝。为了解决字节拷贝的问题,kafka设计了一种“标准字节消息”,Producer、Broker、Consumer共享这一种消息格式。Kakfa的message log在broker端就是一些目录文件,这些日志文件都是MessageSet按照这种“标准字节消息”格式写入到磁盘的。 维持这种通用的格式对这些操作的优化尤为重要:持久化log 块的网络传输。流行的unix操作系统提供了一种非常高效的途径来实现页面缓存和socket之间的数据传递。在Linux操作系统中,这种方式被称作:sendfile system call(Java提供了访问这个系统调用的方法:FileChannel.transferTo api)。

为了理解sendfile的影响,需要理解一般的将数据从文件传到socket的路径:

1.操作系统将数据从磁盘读到内核空间的页缓存中
2.应用将数据从内核空间读到用户空间的缓存中
3.应用将数据写回内核空间的socket缓存中
4.操作系统将数据从socket缓存写到网卡缓存中,以便将数据经网络发出

这种操作方式明显是非常低效的,这里有四次拷贝,两次系统调用。如果使用sendfile,就可以避免两次拷贝:操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。 我们期望一个主题上有多个消费者是一种常见的应用场景。利用上述的zero-copy,数据只被拷贝到页缓存一次,然后就可以在每次消费时被重得利用,而不需要将数据存在内存中,然后在每次读的时候拷贝到内核空间中。这使得消息消费速度可以达到网络连接的速度。这样以来,通过页面缓存和sendfile的结合使用,整个kafka集群几乎都已以缓存的方式提供服务,而且即使下游的consumer很多,也不会对整个集群服务造成压力。

Kafka集群部署

集群部署

为了提高性能,推荐采用专用的服务器来部署kafka集群,尽量与hadoop集群分开,因为kafka依赖磁盘读写和大的页面缓存,如果和hadoop共享节点的话会影响其使用页面缓存的性能。 Kafka集群的大小需要根据硬件的配置、生产者消费者的并发数量、数据的副本个数、数据的保存时长综合确定。 磁盘的吞吐量尤为重要,因为通常kafka的瓶颈就在磁盘上。 Kafka依赖于zookeeper,建议采用专用服务器来部署zookeeper集群,zookeeper集群的节点采用奇数个,一般建议用3、5、7个。注意zookeeper集群越大其读写性能越慢,因为zookeeper需要在节点之间同步数据。一个3节点的zookeeper集群允许一个节点失败,一个5节点集群允许2个几点失败。

集群大小

有很多因素决定着kafka集群需要具备存储能力的大小,最准确的衡量办法就是模拟负载来测算一下,Kafka本身也提供了负载测试的工具。 如果不想通过模拟实验来评估集群大小,最好的办法就是根据硬盘的空间需求来推算。下面我就根据网络和磁盘吞吐量需求来做一下估算。 我们做如下假设:

  • W:每秒写多少MB
  • R :副本数
  • C :Consumer的数量

一般的来说,kafka集群瓶颈在于网络和磁盘吞吐量,所以我们先评估一下集群的网络和磁盘需求。 对于每条消息,每个副本都要写一遍,所以整体写的速度是WR。读数据的部分主要是集群内部各个副本从leader同步消息读和集群外部的consumer读,所以集群内部读的速率是(R-1)W,同时,外部consumer读的速度是CW,因此:

  • Write:W*R
  • Read:(R-1)W+CW

需要注意的是,我们可以在读的时候缓存部分数据来减少IO操作,如果一个集群有M MB内存,写的速度是W MB/sec,则允许M/(WR) 秒的写可以被缓存。如果集群有32GB内存,写的速度是50MB/s的话,则可以至少缓存10分钟的数据。

参考: Heaven-Wang