kafka知识点

简述Kafka的架构设计

image-20220221092957081

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干 broker(作为 Kafka 节点的服务器),若干 Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

  • Producer :消息生产者,就是向 kafka broker 发消息的客户端。
  • Consumer :消息消费者,向 kafka broker 取消息的客户端。
  • Topic :可以理解为一个队列,一个 Topic 又分为一个或多个分区,
  • Consumer Group:这是 kafka 用来实现一个 topic 消息的广播(发给所有的 consumer)和单播(发给任意一个 consumer)的手段。一个topic 可以有多个 Consumer Group。
  • Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  • Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker上,每个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的id(offset)。将消息发给 consumer,kafka 只保证按一个 partition 中的消息的顺序,不保证一个 topic 的整体(多个 partition 间)的顺序。
  • Offset:kafka 的存储文件都是按照 offset.kafka 来命名,用 offset 做名字的好处是方便查找。例如你想找位于 2049 的位置,只要找到 2048.kafka 的文件即可。当然 the first offset 就是 00000000000.kafka

Kafka 都有哪些特点?

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写

哪些场景下会选择 Kafka?

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和 Flink

Kafka 分区的目的?

简单来说就是:负载均衡+水平扩展。

Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 randomkey-hash轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。

Kafka 如何做到消息的有序性?

Kafka 无法做到消息全局有序,只能做到 Partition 维度的有序。

kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是不同分区之间的消息是不保证有序的。

所以如果想要消息有序,就需要从 Partition 维度入手:

  1. 单 Partition,单 Consumer。通过此种方案强制消息全部写入同一个 Partition 内,但是同时也牺牲掉了 Kafka 高吞吐的特性了,所以一般不会采用此方案。
  2. 多 Partition,多 Consumer,指定 key 使用特定的 Hash 策略,使其消息落入指定的 Partition 中,从而保证相同的 key 对应的消息是有序的。此方案也是有一些弊端,比如当 Partition 个数发生变化时,相同的 key 对应的消息会落入到其他的 Partition 上,所以一旦确定 Partition 个数后就不能在修改 Partition 个数了。

Kafka 的高可靠性是怎么实现的?

Kafka实现高可靠性使用的是副本机制。通过调节副本相关参数,可使 Kafka 在性能和可靠性之间取得平衡。

Kafka 文件存储机制

Kafka 中消息是以 topic 进行分类的,生产者通过 topic 向 Kafka broker 发送消息,消费者通过 topic 读取数据。然而 topic 在物理层面又能以 partition 为分组,一个 topic 可以分成若干个 partition。事实上,partition 并不是最终的存储粒度,partition 还可以细分为 segment,一个 partition 物理上由多个 segment 组成。

为什么使用不能以 partition 作为存储单位

当 Kafka producer 不断发送消息,必然会引起 partition 文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响。因此,以 segment 为单位将 partition 进一步细分。每个 partition(目录)相当于一个巨型文件被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中消息数量不一定相等)这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 partition 只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours} 等若干参数)决定。

segment 的工作原理

segment 文件由两部分组成,分别为 “.index” 文件和 “.log” 文件,分别表示为 segment 索引文件和数据文件。这两个文件的命令规则为:partition 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值,数值大小为 64 位,20 位数字字符长度,没有数字用 0 填充,如下:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

enter image description here

复制原理和同步方式

为了提高消息的可靠性,Kafka 每个 topic 的 partition 有 N 个副本(replicas),其中 N(大于等于 1)是 topic 的复制因子(replica fator)的个数。Kafka 通过多副本机制实现故障自动转移,当 Kafka 集群中出现 broker 失效时,副本机制可保证服务可用。对于任何一个 partition,它的 N 个 replicas 中,其中一个 replica 为 leader,其他都为 follower,leader 负责处理 partition 的所有读写请求,follower 则负责被动地去复制 leader 上的数据。

enter image description here

如果 leader 所在的 broker 发生故障或宕机,对应 partition 将因无 leader 而不能处理客户端请求,这时副本的作用就体现出来了:一个新 leader 将从 follower 中被选举出来并继续处理客户端的请求。

同步副本ISR

虽然副本极大的增强了可用性,但是副本数量对 Kafka 的吞吐率有一定影响。默认情况下 Kafka 的 replica 数量为 1,即每个 partition 都只有唯一的 leader,无 follower,没有容灾能力。为了确保消息的可靠性,生产环境中,通常将其值(由 broker 的参数 offsets.topic.replication.factor 指定)大小设置为大于 1,比如 3。 所有的副本(replicas)统称为 Assigned Replicas,即 AR。ISR 是 AR 中的一个子集,由 leader 维护 ISR 列表(ISR列表包含leader本身),follower 从 leader 同步数据有一些延迟(由参数 replica.lag.time.max.ms 设置超时阈值),超过阈值的 follower 将被剔除出 ISR, 存入 OSR(Outof-Sync Replicas)列表,新加入的 follower 也会先存放在 OSR 中。

AR=ISR+OSR

HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,consumer 最多只能消费到 HW 所在的位置。另外每个 replica 都有 HW,leader 和 follower 各自负责更新自己的 HW 的状态。对于 leader 新写入的消息,consumer 不能立刻消费,leader 会等待该消息被所有 ISR 中的 replicas 同步后更新 HW,此时消息才能被 consumer 消费。这样就保证了如果 leader 所在的 broker 失效,该消息仍然可以从新选举的 leader 中获取。对于来自内部 broker 的读取请求,没有 HW 的限制。

下图详细的说明了当 producer 生产消息至 broker 后,ISR 以及 HW 和 LEO 的流转过程:

enter image description here

Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。

同步复制要求所有能工作的 follower 都复制完,这条消息才会被 commit,这种复制方式受限于复制最慢的 follower,会极大的影响吞吐率。而异步复制方式下,follower 异步的从 leader 复制数据,数据只要被 leader 写入 log 就被认为已经 commit,这种情况下如果 follower 都还没有复制完,落后于 leader 时,突然 leader 宕机,则会丢失数据,降低可靠性。

Kafka 使用 ISR 的策略则在可靠性和吞吐率方面取得了较好的平衡。

Kafka 的 ISR 的管理最终都会反馈到 ZooKeeper 节点上,具体位置为:/brokers/topics/[topic]/partitions/[partition]/state。有两个地方会对这个 ISR节点进行维护:

  1. Controller 来维护:Kafka 集群中的其中一个 Broker 会被选举为 Controller,主要负责 Partition 管理和副本状态管理,也会执行类似于重分配 partition 之类的管理任务。在符合某些特定条件下,Controller 下的 LeaderSelector 会选举新的 leader,ISR 和新的 leader_epochcontroller_epoch 写入 ZooKeeper 的相关节点中。同时发起 LeaderAndIsrRequest 通知所有的 replicas。
  2. leader 来维护:leader 有单独的线程定期检测 ISR 中 follower 是否脱离 ISR,如果发现 ISR 变化,则会将新的 ISR 的信息返回到 ZooKeeper 的相关节点中。

数据可靠性和持久性保证

当 producer 向 leader 发送数据时,可以通过 request.required.acks 参数来设置数据可靠性的级别:

  1. request.required.acks = 1

    • 这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。
  2. request.required.acks = 0

    • producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。
  3. request.required.acks = -1(all)

    • producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。
    • 要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。不难理解,如果 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,此时拒绝客户端的写请求以防止消息丢失。

深入解读 HW 机制

考虑这样一种场景:acks=-1,部分 ISR 副本完成同步,此时leader挂掉,如下图所示:follower1 同步了消息 4、5,follower2 同步了消息 4,与此同时 follower2 被选举为 leader,那么此时 follower1 中的多出的消息 5 该做如何处理呢?

enter image description here

这里就需要 HW 的协同配合了。如前所述,一个 partition 中的 ISR 列表中,leader 的 HW 是所有 ISR 列表里副本中最小的那个的 LEO。类似于木桶原理,水位取决于最低那块短板。

enter image description here

某个 topic 的某 partition 有三个副本,分别为 A、B、C。A 作为 leader 肯定是 LEO 最高,B 紧随其后,C 机器由于配置比较低,网络比较差,故而同步最慢。这个时候 A 机器宕机,这时候如果 B 成为 leader,假如没有 HW,在 A 重新恢复之后会做同步(makeFollower) 操作,在宕机时 log 文件之后直接做追加操作,而假如 B 的 LEO 已经达到了 A 的 LEO,会产生数据不一致的情况,所以使用 HW 来避免这种情况。 A 在做同步操作的时候,先将 log 文件截断到之前自己的 HW 的位置,即 3,之后再从 B 中拉取消息进行同步。

如果失败的 follower 恢复过来,它首先将自己的 log 文件截断到上次 checkpointed 时刻的 HW 的位置,之后再从 leader 中同步消息。leader 挂掉会重新选举,新的 leader 会发送 “指令” 让其余的 follower 截断至自身的 HW 的位置然后再拉取新的消息。

当 ISR 中的各副本的 LEO 不一致时,如果此时 leader 挂掉,选举新的 leader 时并不是按照 LEO 的高低进行选举,而是按照 ISR 中的顺序(什么顺序???TODO)选举。

Leader 选举

为了保证可靠性,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer。如此,可以避免因部分数据被写进 leader,而尚未被任何 follower 复制就宕机的情况下而造成数据丢失。

对于 producer 而言,它可以选择是否等待消息 commit,这可以通过参数 request.required.acks 来设置。这种机制可以确保:只要 ISR 中有一个或者以上的 follower,一条被 commit 的消息就不会丢失。

如何在保证可靠性的前提下避免吞吐量下降

当 leader 宕机了,怎样在 follower 中选举出新的 leader,因为 follower 可能落后很多或者直接 crash 了,所以必须确保选择 “最新” 的 follower 作为新的 leader。一个基本的原则就是,如果 leader 挂掉,新的 leader 必须拥有原来的 leader 已经 commit 的所有消息,这就是 ISR 中副本的特征。

因此,ISR 列表维持多大的规模合适呢?换言之,leader 在一个消息被 commit 前需要等待多少个 follower 确认呢?等待 follower 的数量越多,与 leader 保持同步的 follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。

少数服从多数的选举原则

一种常用的选举 leader 的策略是 “少数服从多数” ,不过,Kafka 并不是采用这种方式。这种模式下,如果有 2f+1 个副本,那么在 commit 之前必须保证有 f+1 个 replica 复制完消息,同时为了保证能正确选举出新的 leader,失败的副本数不能超过 f 个。

这种方式有个很大的优势,系统的延迟取决于最快的几台机器,也就是说比如副本数为 3,那么延迟就取决于最快的那个 follower 而不是最慢的那个。

“少数服从多数” 的策略也有一些劣势,为了保证 leader 选举的正常进行,它所能容忍的失败的 follower 数比较少,如果要容忍 1 个 follower 挂掉,那么至少要 3 个以上的副本,如果要容忍 2 个 follower 挂掉,必须要有 5 个以上的副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这种算法更多用在 ZooKeeper 这种共享集群配置的系统中,而很少在需要大量数据的系统中使用。

Kafka 选举 leader 的策略

leader 选举的算法非常多,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka 所使用的 leader 选举算法更像是微软的 PacificA 算法。

Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f+1 个副本,一个 Kafka topic 能在保证不丢失已经 commit 消息的前提下容忍 f 个副本的失败,在大多数使用场景下,这种模式是十分有利的。

事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。

与 “少数服从多数” 策略不同的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。

极端情况下的 leader 选举策略

当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:

  1. 等待 ISR 中任意一个 replica 恢复过来,并且选它作为 leader;
  2. 选择第一个恢复过来的 replica(并不一定是在 ISR 中)作为leader。

如何选择呢?这就需要在可用性和一致性当中作出抉择。如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。

选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略。

unclean.leader.election.enable 这个参数对于 leader 的选举、系统的可用性以及数据的可靠性都有至关重要的影响。生产环境中应慎重权衡。

说说 Kafka 数据一致性原理

ACK机制

为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),如果producer收到ack,就会进行下一轮的发送,否则重新发送数据。

img

AR、ISR、OSR

设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower,因为某种故障,迟迟不能与leader进行同步,那leader就要一直等下去,直到它完成同步,才能发送ack。kafka如何解决这个问题呢?

Leader维护了一个动态的in-sync replica set (ISR-同步副本列表),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。如果follower长时间未向leader同步数据,则该follower将被踢出ISR,该时间阈值由replica.lag.time.max.ms参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

  • ISR(In-Sync Replicas ):与leader保持同步的follower集合
  • AR(Assigned Replicas):分区的所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(延迟时间replica.lag.time.max.ms),超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

ACK应答机制

通过 request.required.acks 参数来设置数据可靠性的级别:

  1. request.required.acks = 1

    • 这是默认情况,即:producer 发送数据到 leader,leader 写本地日志成功,返回客户端成功;此时 ISR 中的其它副本还没有来得及拉取该消息,如果此时 leader 宕机了,那么此次发送的消息就会丢失。
    • img
  2. request.required.acks = 0

    • producer 不停向leader发送数据,而不需要 leader 反馈成功消息,这种情况下数据传输效率最高,但是数据可靠性确是最低的。可能在发送过程中丢失数据,可能在 leader 宕机时丢失数据。
  3. request.required.acks = -1(all)

    • producer 发送数据给 leader,leader 收到数据后要等到 ISR 列表中的所有副本都同步数据完成后(强一致性),才向生产者返回成功消息,如果一直收不到成功消息,则认为发送数据失败会自动重发数据。这是可靠性最高的方案,当然,性能也会受到一定影响。
    • 要提高数据的可靠性,在设置 request.required.acks=-1 的同时,还需参数 min.insync.replicas 配合,如此才能发挥最大的功效。min.insync.replicas 这个参数用于设定 ISR 中的最小副本数,默认值为1,当且仅当 request.required.acks 参数设置为-1时,此参数才生效。当 ISR 中的副本数少于 min.insync.replicas 配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。不难理解,如果 min.insync.replicas 设置为 2,当 ISR 中实际副本数为 1 时(只有leader),将无法保证可靠性,此时拒绝客户端的写请求以防止消息丢失。
    • producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,导致没有返回ack给Producer,由于失败重试机制,又会给新选举出来的leader发送数据,造成数据重复。
    • img

LEO、HW、LSO、LW名词解释

img

  • LEO:LogEndOffset 的缩写,表示Topic中 partition 的副本中的 log 最后一条 Message 的位置。
  • HW:HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,也是consumer 最多只能消费到 HW 所在的位置。
  • LW:LW是Low Watermark的缩写,俗称“低水位”,代表AR集合中最小的logStartOffset值,副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的的被清理,进而导致logStartoffset的增加)
    和删除请求(DeleteRecordRequest)都可能促使LW的增长。
  • LSO:LSO(LogStartOffset不可以缩写为LSO)特指LastStableOffset。它具体与kafka的事务有关。

故障处理细节

img

follower故障

follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等该follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

leader故障

leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。

注意点

这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复(TODO WHY?)。

ISR 集合和 HW、LEO的关系

假设某分区的 ISR 集合中有 3 个副本,即一个 leader 副本和 2 个 follower 副本,此时分区的 LEO 和 HW 都分别为 3 。消息3和消息4从生产者出发之后先被存入leader副本。

img

在消息被写入leader副本之后,follower副本会发送拉取请求来拉取消息3和消息4进行消息同步。

在同步过程中不同的副本同步的效率不尽相同,在某一时刻follower1完全跟上了leader副本而follower2只同步了消息3,如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO 为4,那么当前分区的HW取最小值4,此时消费者可以消费到offset0至3之间的消息。

img

当所有副本都成功写入消息3和消息4之后,整个分区的HW和LEO都变为5,因此消费者可以消费到offset为4的消息了。

img

由此可见kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower副本都复制完,这条消息才会被确认已成功提交,这种复制方式极大的影响了性能。而在异步复制的方式下,follower副本异步的从leader副本中复制数据,数据只要被leader副本写入就会被认为已经成功提交。在这种情况下,如果follower副本都还没有复制完而落后于leader副本,然后leader副本宕机,则会造成数据丢失。kafka使用这种ISR的方式有效的权衡了数据可靠性和性能之间的关系。

AR、ISR、OSR 是什么?

在zk中会保存AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR

  • ISR(in sync replica):是kafka动态维护的一组同步副本,在ISR中有成员存活时,只有这个组的成员才可以成为leader,内部保存的为每次提交信息时必须同步的副本(acks = all时),每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务,当ISR中的副本被认为坏掉的时候,会被踢出ISR,当重新跟上leader的消息数据时,重新进入ISR。
  • OSR(out sync replica):保存的副本不必保证必须同步完成才进行确认,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。

LEO、HW、LSO、LW等分别代表什么

  • LEO:LogEndOffset 的缩写,表示Topic中 partition 的副本中的 log 最后一条 Message 的位置。
  • HW:HW 俗称高水位,HighWatermark 的缩写,取一个 partition 对应的 ISR 中最小的 LEO 作为 HW,也是consumer 最多只能消费到 HW 所在的位置。
  • LW:LW是Low Watermark的缩写,俗称“低水位”,代表AR集合中最小的logStartOffset值,副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的的被清理,进而导致logStartoffset的增加)
    和删除请求(DeleteRecordRequest)都可能促使LW的增长。
  • LSO:LSO(LogStartOffset不可以缩写为LSO)特指LastStableOffset。它具体与kafka的事务有关。

Kafka 在什么情况下会出现消息丢失?

根据消息的传递过程可能有三种情况发生消息丢失。

生产者丢失消息

Kafka 通过配置 request.required.acks 属性来确认消息的生产;

  • 如果 acks 配置为 0,发生网络抖动消息丢了,生产者不校验 ACK 自然就不知道丢了。
  • 如果 acks 配置为 1 保证 leader 不丢,但是如果 leader 挂了,恰好选了一个没有 ACK 的 follower,那也丢了。
  • all:保证 leader 和 follower 不丢,但是如果网络拥塞,没有收到 ACK,会有重复发的问题。

Kafka Broker 丢失消息

操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中,至于什么时候将缓存的数据写入文件中是由操作系统自行决定。

Kafka 提供了一个参数 producer.type 来控制是不是主动 flush,如果 Kafka 写入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步 (sync);写入 mmap 之后立即返回 Producer 不调用 flush 叫异步 (async)。

Kafka 通过多分区多副本机制中已经能最大限度保证数据不会丢失,如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了,当然这种情况很极端。

消费者丢失消息

消费消息的时候主要分为两个阶段:

  • 标识消息已被消费,commit offset 坐标;
  • 处理消息。

不论是是先处理再提交,还是先提交再处理都会发生丢失消息的情况。

  1. 场景一:先 commit 再处理消息。如果在处理消息的时候异常了,但是 offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。
  2. 场景二:先处理消息再 commit。如果在 commit 之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。

怎么尽可能保证 Kafka 的可靠性

根据消息的传递过程,需要在生产者、Broker和消费者三处进行设置来最大程度地保证Kafak的可靠性。

保证生产者的消息可靠性

从本质上来说,生产者与Broker之间是通过网络进行通讯的,因此保障生产者的消息可靠性就必须要保证网络可靠性,这里Kafka使用acks=all可以设置Broker收到消息并同步到所有从节点后给生产者一个确认消息。如果生产者没有收到确认消息就会多次重复向Broker发送消息,保证在生产者与Broker之间的消息可靠性。

保证Broker的消息可靠性

在Broker收到了生产者的消息后,也有可能会丢失消息,最常见的情况是消息到达某个Broker后服务器就宕机了。这里需要补充说明一下Kafka的高可用性,直观的看,Kafka一般可被分成多个Broker节点,而为了增加Kafka的吞吐量,一个topic通常被分为多个partition,每个partition分布在不同的Broker上。如果一个partition丢失就会导致topic内容的部分丢失,因此partition往往需要多个副本,以此来保证高可用。

保证消费者的消息可靠性

这里比较容易发生消息丢失的情况是,消费者从Broker把消息拉过来,如果这个时候还没有消费完,消费者就挂了并且消费者自动提交了offset,那么此时就丢失了一条消息。所以解决办法就是关闭自动提交offset,等真正消费成功之后再手动提交offset。

消费者和消费者组有什么关系?

消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数 group.id 来配置,默认值为空字符串。

Kafka 的每个分区只能被一个消费者线程,如何做到多个线程同时消费一个分区?

将原来的消费线程分拆为2类线程(拉取线程、工作线程),通过这2类线程的分工合作来完成消息的消费:

  • 拉取线程:只负责kafka消息的拉取、分发和消息offset的提交(可选),但不负责消息的业务处理。拉取线程的数量依旧要受Kafka的“一个Partition只能被该Group里的一个Consumer线程消费”规则的限制,即若该Group所订阅的Topic有N个Partition,则该Group最多只能有N个拉取线程。
  • 工作线程:只负责处理消息的业务处理,但不负责kafka消息的拉取和offset的提交。拉取线程拉取到消息后,便可将消息分发给工作线程进行处理。比如拉取线程一次拉取到100个消息,分发给20个工作线程并行异步处理。

Kafka 消费者是否可以消费指定分区消息?

可以;对于原生API,可以使用consumer.assign(partitions)来订阅指定分区。

Kafka 消息格式的演变清楚吗?

TODO

Kafka 高效文件存储设计特点

[Kafka 文件存储机制](#Kafka 文件存储机制)

Kafka 是如何保证高吞吐率的?

Kafka的设计目标是高吞吐量,它比其它消息系统快的原因体现在以下几方面:

  1. Kafka操作的是序列文件I / O(序列文件的特征是按顺序写,按顺序读),为保证顺序,Kafka强制点对点的按顺序传递消息,这意味着,一个consumer在消息流(或分区)中只有一个位置。
  2. Kafka不保存消息的状态,即消息是否被“消费”。一般的消息系统需要保存消息的状态,并且还需要以随机访问的形式更新消息的状态。而Kafka 的做法是保存Consumer在Topic分区中的位置offset,在offset之前的消息是已被“消费”的,在offset之后则为未“消费”的,并且offset是可以任意移动的,这样就消除了大部分的随机IO。
  3. Kafka支持点对点的批量消息传递。
  4. Kafka的消息存储在OS pagecache(页缓存,page cache的大小为一页,通常为4K,在Linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上映像和数据的访问)。

顺序读写kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能

顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写

生产者负责写入数据,Kafka会将消息持久化到磁盘,保证不会丢失数据,Kafka采用了俩个技术提高写入的速度。

  1. 顺序写入:在大学的计算机组成(划重点)里我们学过,硬盘是机械结构,需要指针寻址找到存储数据的位置,所以,如果是随机IO,磁盘会进行频繁的寻址,导致写入速度下降。Kafka使用了顺序IO提高了磁盘的写入速度,Kafka会将数据顺序插入到文件末尾,消费者端通过控制偏移量来读取消息,这样做会导致数据无法删除,时间一长,磁盘空间会满,kafka提供了2种策略来删除数据:基于时间删除和基于partition文件的大小删除。
  2. Memory Mapped Files:这个和Java NIO中的内存映射基本相同,在大学的计算机原理里我们学过(划重点),mmf直接利用操作系统的Page来实现文件到物理内存的映射,完成之后对物理内存的操作会直接同步到硬盘。mmf通过内存映射的方式大大提高了IO速率,省去了用户空间到内核空间的复制。它的缺点显而易见--不可靠,当发生宕机而数据未同步到硬盘时,数据会丢失,Kafka提供了produce.type参数来控制是否主动的进行刷新,如果kafka写入到mmp后立即flush再返回给生产者则为同步模式,反之为异步模式。

2.零拷贝

在这之前先来了解一下零拷贝(直接让操作系统的 Cache 中的数据发送到网卡后传输给下游的消费者):平时从服务器读取静态文件时,服务器先将文件从复制到内核空间,再复制到用户空间,最后再复制到内核空间并通过网卡发送出去,而零拷贝则是直接从内核到内核再到网卡,省去了用户空间的复制。

Kafka把所有的消息存放到一个文件中,当消费者需要数据的时候直接将文件发送给消费者,比如10W的消息共10M,全部发送给消费者,10M的消息在内网中传输是非常快的,假如需要1s,那么kafka的tps就是10w。Zero copy对应的是Linux中sendfile函数,这个函数会接受一个offsize来确定从哪里开始读取。现实中,不可能将整个文件全部发给消费者,他通过消费者传递过来的偏移量来使用零拷贝读取指定内容的数据返回给消费者。

在Linux kernel2.2 之后出现了一种叫做"零拷贝(zero-copy)"系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”,系统上下文切换减少为2次,可以提升一倍的性能。

img

分区

kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力。

批量发送

kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka

  1. 等消息条数到固定条数
  2. 一段时间发送一次

数据压缩

Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩。
压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得

批量发送数据压缩一起使用,单条做数据压缩的话,效果不明显

如何为Kafka集群选择合适的Topics/Partitions数量

越多的partition可以提供更高的吞吐量,但是越多的分区需要打开更多的文件句柄、会导致端对端的延迟、需要机器更多的内存,在出现故障的时候也会需要更长时间的恢复期。

通常对于Topic分区数的计算,需要预估一个这个Topic的吞吐量,然后测试单个生产者和消费者的吞吐量,取较大的指,然后相除得到较为合适的分区数量。

谈谈你对 Kafka 事务的了解?

谈谈你对 Kafka 幂等的了解?

Kafka 优缺点有哪些?

优点:

  • 高性能:单机测试能达到 100w tps;
  • 低延时:生产和消费的延时都很低,e2e的延时在正常的cluster中也很低;
  • 可用性高:replicate + isr + 选举 机制保证;
  • 工具链成熟:监控 运维 管理 方案齐全;
  • 生态成熟:大数据场景必不可少 kafka stream.

缺点:

  • 无法弹性扩容:对partition的读写都在partition leader所在的broker,如果该broker压力过大,也无法通过新增broker来解决问题;
  • 扩容成本高:集群中新增的broker只会处理新topic,如果要分担老topic-partition的压力,需要手动迁移partition,这时会占用大量集群带宽;
  • 消费者新加入和退出会造成整个消费组rebalance:导致数据重复消费,影响消费速度,增加e2e延迟;
  • partition过多会使得性能显著下降:ZK压力大,broker上partition过多让磁盘顺序写几乎退化成随机写。

Kafka 新旧消费者API的区别

属性新版本旧版本
编程语言JavaScala
客户端(Kafka消费者、生产者相对于服务端broker来说都是客户端)所在包org.apache.kafka.clients.*kafka.consumer.*
消费者组的消费进度offset保存位置内部的topic__consumer_offsets中(有compact机制,只会保存最新的消费进度,不用担心消费记录占用空间)ZooKeeper(当消费频繁时需要频繁写zk,zk不适合用作频繁写的场景)
消费者组管理由某个broker担任消费组的协调者coordinator,使得消费组成员变化不用依靠zk来管理,进一步解耦zk依赖zk管理
旧版本的消费者属性高阶API低阶API
是否有消费者组有(多个消费者组成一个组,彼此之间有关联)无(单独的消费者,与其他消费者没有关系)
优缺点死板省事灵活麻烦
使用范围一般的开发场景需要反复消费历史数据的场景。
只想消费部分分区的数据。
需要精确实现处理一次(exactly-once,可以通过把位移提交和数据处理放入一个事务中实现)。
消费的offset选择只能从上次消费的地方接着消费(除非自己去重置消费位移)可以从分区任意位置(前提是kafka消息日志还没有被删除)消费

Kafka 分区数可以增加或减少吗?为什么?

可以使用 bin/kafka-topics.sh 命令对 Kafka 增加 Kafka 的分区数据,但是 Kafka 不支持减少分区数

Kafka 分区数据不支持减少是由很多原因的,比如减少的分区其数据放到哪里去?是删除,还是保留?删除的话,那么这些没消费的消息不就丢了。如果保留这些消息如何放到其他分区里面?追加到其他分区后面的话那么就破坏了 Kafka 单个分区的有序性。如果要保证删除分区数据插入到其他分区保证有序性,那么实现起来逻辑就会非常复杂。

Kafka消息丢失的场景和解决方案

[Kafka 在什么情况下会出现消息丢失?](#Kafka 在什么情况下会出现消息丢失?)

Kafka消息消费是pull还是push,并分析两种方式的优劣性

Kafka消息消费是消费者pull。

pull模型优点:

  1. 消费端可以根据自身消费能力决定是否pull(流转机制) 。

pull模型缺点:

  1. 如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到达。

push模型优点:

  1. 实时性强

push模型缺点:

  1. push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。

简述Kafka的rebalance机制

重平衡的作用

让消费者分组内消费者消费哪些主题分区达成一致。重平衡需要借助Kafka Broker端的Coordinator组件,在Coordinator的帮助下完成消费者组的分区重新分配。

触发重平衡的三个条件

  1. 消费者组内成员数量发生变化
  2. 消费者组订阅主题的数量发生变化
  3. 订阅主题的分区数量发生变化。

重平衡的流程

重平衡是通过消费者端的心跳线程通知其他的消费实例发生重平衡。重平衡开启后,borker通过维护一套消费者组状态机来协调完成整个重平衡机制。

消费者的五种状态

状态含义
Empty组内没有任何成员,但是消费者可能存在已提交的位移数据,而且这些唯一尚未过期
Dead同样是组内没有任何成员,但是组的元数据信息已经被协调者端移除,协调者保存着当前向他注册过的所有组信息
PreparingRebalance消费者组准备开启重平衡,此时所有成员都需要重新加入消费者组
CompletingRebalance消费者组下所有成员已经加入,各个成员中等待分配方案
Stable消费者组的稳定状态,该状态表明重平衡已经完成,组内成员能够正常消费数据

五种状态的流转

  1. 一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
  2. 当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。

详细流程

在消费者端,重平衡分为两个步骤:分别是加入组和等待领导者消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。

  1. 当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。领导者消费者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
  2. 选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。
  3. 其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。

Broker端重平衡流程

场景一:新成员加入

当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。

image.png

场景二:成员主动离组

消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员

image.png

场景三:组成员崩溃离组

他和成员主动离组的区别是它属于被动离开,broker端不知道成员离组,经过心跳超时后,判定成员离组的一种情况。

image.png

简述Kafka中zookeeper的作用

Zookeeper的主要作用是在集群中的不同节点之间建立协调;如果任何节点失败,我们还使用Zookeeper从先前提交的偏移量中恢复,因为它做周期性提交偏移量工作。

ZooKeeper作用之:Broker注册

Broker是分布式部署并且相互之间相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点。

ZooKeeper作用之:Topic注册

在Kafka中,同一个Topic的消息会被分成多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录。

ZooKeeper作用之:生产者负载均衡

由于同一个Topic消息会被分区并将其分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

ZooKeeper作用之:消费者负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不同的消费者分组消费自己特定的Topic下面的消息,互不干扰。

ZooKeeper作用之:分区与消费者的关系

消费组(Consumer Group):consumer group下有多个Consumer(消费者),对于每个消费者组 (Consumer Group),Kafka都会为其分配一个全局唯一的Group ID,Group 内部的所有消费者共享该 ID。订阅的topic下的每个分区只能分配给某个 group 下的一个consumer(当然该分区还可以被分配给其他group)。同时,Kafka为每个消费者分配一个Consumer ID,通常采用"Hostname:UUID"形式表示。

在Kafka中,规定了每个消息分区 只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上。

ZooKeeper作用之:消费进度Offset记录

在消费者对指定消息分区进行消息消费的过程中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录。节点内容是Offset的值。

ZooKeeper作用之:消费者注册

每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点。

早期版本的Kafka用zk做meta信息存储,consumer的消费状态,group的管理以及offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中确实逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖。

参考资料