原文链接 译者:astron 原文作者: Jay Kreps 2014年4月27日
我写过一篇LinkedIn如何使用Apache Kafka作为集中发布订阅日志,以便在应用程序,流处理和Hadoop之间集成数据的博客文章。
为了达到这样的目的,这个“通用日志”必须是一个简单的抽象。如果要将系统用作中央数据中心,那么它必须足够快,可预测和易于扩展,以便将所有数据转储到其中。我的经验是,脆弱或昂贵的系统不可避免地会形成一个阻碍人们无法使用它们的墙壁。一个容易扩展的系统通常最终会成为一个关键的架构构建模块,因为它构建起来最为简单。
我一直喜欢Cassandra的基准测试,它在EC2和Google Compute Engine上的三百台机器上每秒钟写入一百万次。我不知道为什么,也许这是一个Dr. Evil的事情,但每秒钟做一百万次任何事情都很有趣。
无论如何,正如我们将看到的那样,kafka日志的其中一个好处是它便宜。每秒百万次写入并不是特别大的事情。这是因为日志比数据库或键值存储要简单得多。事实上,我们的产品集群全天要产生数千万的读写,而且它们也是运行在廉价硬件上。
让我们做一些基准测试并观测结果。
30秒简单回顾下kafka
为了帮助理解基准测试,让我们快速回顾一下Kafka的内容以及有关它的工作原理的一些细节。 Kafka是一种分布式消息系统,最初是在LinkedIn建立的,现在是Apache Software Foundation的一部分,并被各种公司使用。
一般配置很简单。生产者将记录发送到集群,将其记录,并将其交给消费者:
kafka的重要抽象是主题。生产者将记录发布到主题,消费者订阅一个或多个主题。 Kafka主题只是一个分片上的的预写日志。生产者将记录附加到这些日志和消费者订阅更改。每条记录是一个键/值对。其中的key用于将记录分配给特定日志分区(除非发布者直接指定分区)。
这是一个简单的例子,单一的生产者和消费者从两个分区主题的读和写。
此图显示一个生产者进程追加到两个分区的日志,一个消费者从相同的日志读取。日志中的每个记录都有一个相关的条目号,我们称之为偏移量。消费者使用偏移量来描述它在每个日志中的位置。
这些分区分布集群上,允许主题容纳比任何一台机器上多的数据。
注意,与大多数消息系统不同,日志始终是持久的。当收到消息时直接将其写入文件系统。消息在读取时不会被删除,但按照可配置的SLA保留(比如说几天或一周)。这允许在数据消费者可能需要重新加载数据的情况下使用。它还可以支持节省空间的发布订阅,因为无论消费者多少只有单一共享日志;在传统的消息系统中,通常每个消费者都有一个队列,所以添加一个消费者可以使你的数据量翻倍。这使得Kafka非常适合于正常消息系统外的事情,例如用作Hadoop等离线数据系统的管道。这些离线系统只能作为周期性ETL周期的一部分间隔加载,或者可能会花费几个小时进行维护,在此期间,如果需要,Kafka能够缓冲TB级别未消耗数据。
Kafka还通过复制日志到多台服务器以进行容错。与其他消息传递系统相比,副本实现的一个重要架构是复制不需要复杂的配置,这些配置仅在非常特殊的情况下使用。假定复制是默认的:我们将未复制数据视为副本因子恰好为一个的特殊情况。
当生产者发布包含偏移的消息时,生产者会收到确认。发布到分区的第一条记录返回偏移量0,第二条记录1,并按照序列增长。消费者从偏移指定的位置消费数据,并通过定期提交将位置保存在日志中:保存该偏移量,以防消费者实例崩溃,另一个实例可以从偏移的位置恢复。
希望这些有所帮助(如果没有,你可以在这里阅读更完整的kafka介绍)。
本次基准测试
这个测试是针对主干,因为我对这个基准性能测试进行了一些改进。
自上个完全版本发布以来,并无实质性变化,因此您应该看到与0.8.1相似的结果。我也使用我们新的重写的Java生产者,比以前的生产者客户端提供了更大的吞吐量。
我遵循了一个很好的RabbitMQ基准测试的基本模板,但也覆盖了与Kafka更密切的场景和选项。
关于这个基准测试的哲学观点。对于要公开的基准测试,我喜欢遵循“懒惰基准”的风格。当我们针对应用系统时,针对任何特定用例,通常可以将其调整到极致。这种风格基准测试,大量调整配置,甚至针对每个场景,都会有不同的调整。我认为一个系统的真实测试不是在参数调到极致后开始测试,而是如何执行手边“现成”的系统。尤其对于在具有数十个或数百个用例的多租户设置的系统来说,针对每个用例的调优不仅不切实际但不可能。因此,针对服务器和客户端,我大多使用默认配置参数。我会指出那些我怀疑可以通过微调得到结果改善的地方,但我试图抵制诱惑,避免这种微调带来的改善结果。
我发布了我的准确配置和命令,所以使用你自己的机器也能复现实验结果。
配置
我有六台机器,每个都有以下规格的:
Xeon 2.5 GHz处理器六核
6个7200 RPM SATA驱动器
32GB的RAM
1Gb以太网
Kafka集群设置在三台机器上。六个驱动器直接安装,没有RAID(JBOD风格)。剩下的三台机器我用于Zookeeper并且用于产生负载。
三机集群并不是很大,但是由于我们需要测试到三个副本因子,所以我们需要三台机器。显而易见的是,我们可以随时添加更多的分区并将数据扩展到更多的机器上从而水平扩展我们的集群。测试用的物理机器实际上不是LinkedIn的日常Kafka程序运行的硬件。我们日常使用的kafka机器针对kafka进行的调优,但与我的本意相反,我希望试图在“现成”的通用机器上进行这些测试。相反,我从一个Hadoop集群中借用到了这次实验的机器,这些机器是我们任何持久化系统中最便宜的。 Hadoop与Kafka的使用模式非常相似,所以这是一件合理的事情。
闲话少说,看结果!
生产者吞吐量
对生产者的吞吐量进行压力测试。测试期间没有消费者运行,所以所有的消息都被持久化但不会被读取的(我们稍后会测试生产者和消费者都有的情况)。由于我们最近重写了生产者,本次测试新的代码。
单个生产者线程,单副本
821557个记录/秒(78.3 MB /秒)
对于这个第一个测试,我创建一个具有六个分区并且没有副本的主题。然后,我从单个线程尽可能快地生成了五千万个(100字节)的小记录。关注小记录的原因在于通常对于消息系统而言小记录更难。如果消息很大,很容易以MB /秒获得良好的吞吐量,但是当消息很小时,获得良好的吞吐量更难,因为处理每个消息的开销占主导地位。
整个这个基准测试中,当我报告MB /秒,我只报告每秒请求记录的个数,请求的其他开销都不包括在内。所以实际的网络使用率会比报告的高。例如,使用100字节的消息,每条我们还将传输大约22字节的开销(对于可选的密钥,分隔大小,消息CRC,记录偏移和属性标志)以及请求的一些开销(包括主题,分区,确认等)。这使得我们到达网卡的极限有点困难,但这似乎更合理。因此,在上述结果中,我们可能会在客户端机器上使用千兆网卡。
一个直观结果是,数据远远高于人们期望的,特别是对于持久存储系统。如果您习惯于随机访问数据系统(如数据库或键值存储),则通常期望最大吞吐量大约为每秒5,000到50,000个查询,因为这接近好的RPC层可以执行远程请求的速度。由于两个关键的设计原则,我们超越了它们:
我们努力保证线性磁盘I / O。这些服务器的六个廉价磁盘具有822 MB /秒的线性磁盘I / O的总吞吐量。这实际上远远超出了我们只能使用1千兆网卡。许多消息系统将持久性作为一种严重降低性能的昂贵的附加功能,并认为应该谨慎使用,但这是因为它们不能做线性I / O。
在每个阶段,我们都将小数据数据合并在一起,进行大型网络和磁盘I / O操作。例如,在新生产者中,我们使用“组提交”机制来确保任何记录初始化发送时另一个I / O正在进行分组。要了解更多批处理的重要性,请查看David Patterson的演讲文稿“Latency Lags Bandwidth”。
如果您对细节感兴趣,您可以在我们的设计文档中阅读更多内容。
单个生产者线程,三个副本,异步方式
786980 record / sec(75.1 MB / sec)
此测试与前一个测试完全相同,只是每个分区都有三个副本(因此写入网络或磁盘的总数据是三倍)。如果副本是master从生产者写入分区,如果是follower则获取和写入数据到分区中。测试中的复制是异步的。也就是说,服务器一旦将消息写入本地日志即可确认,而不必等待其他副本确认。这意味着,如果master宕机,它可能会丢失最后几条已写入但尚未复制的消息。消息确认延迟稍微好一些,而在服务器故障的情况下,有副本丢失风险。
关键点是复制是快速的。总共集群写入容量是三倍(因为每条消息写入三次),但是每个客户端的吞吐量仍然相当不错。高性能复制在很大程度上来自于消费者的效率(复制真的只不过是一个特定功能的消费者)。我将在消费者部分讨论。
单个生产者线程,3个副本,同步复制
428823条记录/秒(40.2 MB /秒)
此测试与上述相同,只是现在master服务器在确认回生产者前,要等待来自全部同步副本的确认。在这种模式下,只要一个同步的副本存在,我们保证消息不会丢失。
在Kafka中,同步复制与从异步复制其实并无太大区别。分区的leader总是跟踪follower副本的进度来监控他们的活跃度,直到全部副本确认复制完毕,我们才向消费者发出消息。通过同步复制,我们只需等待生产者请求的响应,直到follower复制完毕。
这个额外的延迟似乎影响了我们的吞吐量。由于服务器上的代码路径非常相似,我们可以通过将激进的批处理调整,允许客户端缓冲更多未完成的请求来改善这种影响。然而,在避免特殊情况调优的原则下,我已经避免了这个测试。
三个生产者,3个副本,异步复制
2024032个记录/秒(193.0 MB /秒)
我们的单个生产者显然不会压测到三个节点的集群。为了增加一点负载,我现在重复之前的异步副本复制测试。但是现在使用运行在三台不同机器上的三个生产者负载生成器(因为网卡瓶颈单机多进程不会有所帮助)。我们可以看看这三个生产者的总体吞吐量,以便更好地了解集群的总容量。
生产者吞吐量与存储数据
许多消息系统的隐藏危险之一是,对于在内存中保留的数据,它们工作的很好。但当数据备份不消耗(因此需要存储在磁盘上)时,它们的吞吐量下降一个数量级(或更多)。这意味着只要您的消费者保持消息队列及时清掉,事情就可以正常运行。但是一旦它们滞后,整个消息层将备份未消耗的数据。备份导致数据进入磁盘,这反过来会导致性能下降到一个速率,这意味着消息传递系统不能再跟上传入的数据,并把它们备份或者直接挂掉。这是非常可怕的,因为在许多情况下,队列的终极目的是优雅地处理这样的情况。
由于Kafka对于未消费的消息,总是保证O(1)的性能。
为了测试这个实验,让我们运行一段更长的时间,并在存储的数据集增长时绘制结果:
该图实际上显示了性能方面的变化,但跟数据大小没有影响,写入TB数据后跟最初写入几百MB性能一样好。
这种差异是由于Linux的I / O管理设施批处理数据,然后定期刷新。我们在Kafka生产集群配置上设置得更好。http://kafka.apache.org/documentation.html#hwandos
消费者吞吐量
现在让我们将注意力转向消费者吞吐量。
注意,复制因子不会影响此测试的结果,因为消费者只能从一个副本读取,无视副本数量。同样,生产者的确认级别也不重要,因为消费者只读取完全确认的消息(即使生产者不等待完全确认)。这是为了确保消费者看到的任何消息是在leader切换(如果当前leader失败)之后。
单个消费者
90452条记录/秒(89.7 MB /秒)
第一次测试,我们将在一个线程从我们的6分区3副本主题消费5000万条消息。
kafka的消费者是非常高效的。它通过从文件系统直接获取日志块来工作。它使用sendfile API直接通过操作系统传输,而无需通过应用程序复制此数据的开销。这个测试实际上是从日志头部开始,所以它正在做真正的I / O读取。然而,在生产环境中,消费者几乎完全从操作系统页面缓存中读取,因为它正在读取刚刚由一些生产者写入的数据(因此仍然被缓存)。实际上,如果您在生产服务器上运行I / O stat,看到即使消耗了大量数据,也没有任何物理读取。
对于期望的Kafka,消费者廉价是很重要的。一方面,副本自身就是消费者,所以让消费者便宜,使得复制便宜。另外,这样可以将数据处理变成一种廉价的操作,因此我们不需要紧紧控制可扩展性。
三个消费者
2615968个记录/秒(249.5 MB /秒)
让我们重复同样的测试,但是运行三个并行的消费者进程,每个进程在不同的机器,并且消耗同一主题。
如预期的那样,基本是线性扩展。(不奇怪,因为我们的模型中的消费者是如此简单)。
生产者和消费者
795064记录/秒(75.8 MB /秒)
上述测试只包括生产者和消费者独立运行。现在我们来做更符合实际情况的事情,一起运行。实际上,在技术上已经这样做了,因为我们的副本复制是把服务器当作消费者来实现的。
我们来运行测试。对于此测试,我们将在六分区3副本主题上运行一个生产者和一个消费者,这些主题将开始为空。生产者使用异步复制。报告的吞吐量是消费者吞吐量(生产者吞吐量的上限)。
正如我们预期的那样,结果基与只有生产者的情况相同 – 消费者相当便宜。
消息大小的影响/
之前报告的是100字节的小消息。较小的消息是消息系统更难的问题,因为它们会放大系统记录的开销。改变记录大小,在按照条数/秒和MB /秒中绘制吞吐量来证明。
正如预期,这个图表显示由于数据变大,我们每秒可以发送的记录的原始记录条数减少。但是,如果在MB /秒图中,随着消息变大,实际用户数据的总字节吞吐量增加:
使用10个字节的消息,实际上被CPU来限制,因为是获取锁和发送消息入队操作,不能最大限度地发挥网络性能。然而,从100字节开始,网络开始饱和(尽管MB / sec持续增加,因为固定大小的字节与发送总字节占比的越来越小)。
端到端延迟
2ms(中位数)
3毫秒(99百分位数)
14毫秒(99.9百分位数)
关于吞吐量,我们已经谈到了很多,但是什么是消息传递的延迟?
也就是说,我们发送给消费者的消息需要多长时间?
对于这个测试,我们将创建生产者和消费者,并针对生产者向kafka集群发送消息需要多长时间,被消费者接收进行多次计时。
注意,Kafka只有所有同步副本确认消息时,才向消费者发出消息。无论是使用同步还是异步复制,这个测试将给出相同的结果。因为该设置只影响生产者的确认。
重复测试
如果你想在自己的机器上尝试这些基准测试,完全没问题。正如我所说,我大多只是使用我们与Kafka一起提供的预装性能测试工具,并且主要使用服务器和客户端的默认配置。您也可以在此处查看有关配置和命令的更多详细信息https://gist.github.com/jkreps/c7ddb4041ef62a900e6c。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/100657.html