高可用的由来
为何需要Replication?
在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,
这与Kafka数据持久性及Delivery Guarantee
的设计目标相悖。同时Producer都不能再将数据存储在该broker上的Partition中。
如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可选择继续选择发送。 而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据的丢失。
如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据, 这会造成数据丢失并且用户只能通过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。
由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加, 整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。
Leader Election
引入Replication之后,同一个Partition可能会有多个Replication,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互, 其它Replication作为Follower从Leader中复制数据。
因为需要保证同一个Partition的多个Replication之间的数据一致性(其中一个宕机后其它Replication必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。
如果没有一个Leader,所有Replication都可同时读/写
数据,那就需要保证多个Replication之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,
大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),
系统更加简单且高效。
Kafka HA设计解析
如何将所有Replica均匀分布到整个集群?
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。
同时为了提高Kafka的容错能力,也需要将同一个Partition的Replication尽量分散到不同的机器。实际上,如果所有的Replication都在同一个Broker上,那一旦该Broker宕机, 该Partition的所有Replication都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replication的算法如下:
1、将所有Broker(假设共n个Broker)和待分配的Partition排序
2、将第i个Partition分配到第(i mod n)个Broker上
3、将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
Data Replication(副本策略)
Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。
消息传递同步策略
Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少, Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上, Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replication的ACK, 该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息, Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。
Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。
Kafka Replication的数据流如下图所示:
ACK前需要保证有多少个备份
对于Kafka而言,定义一个Broker是否“活着”包含两个条件:
1、是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。
2、是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的Replicatione列表,该列表称为ISR(即in-sync Replica)
。如果一个Follower宕机,或者落后太多
,Leader将把它从ISR中移除。
这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值
(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.max.messages
配置,其默认值是4000
)或者
Follower超过一定时间(该值可在$KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms
来配置,其默认值是10000
)未向Leader发送fetch请求。
Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,
这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,
这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
Follower可以批量的从Leader复制数据
,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。
需要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题
。一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。
这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。
而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks
来设置。这种机制确保了只要ISR有一个或以上的Follower,
一条被commit的消息就不会丢失。
Leader Election算法
Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:
1、节点名称唯一性:多个客户端创建一个节点,只有成功创建节点的客户端才能获得锁
2、临时顺序节点:所有客户端在某个目录下创建自己的临时顺序节点,只有序号最小的才获得锁
一种非常常用的选举leader的方式是“Majority Vote”(少数服从多数
),但Kafka并未采用这种方式。
这种模式下,如果我们有2N+1
个Replication(包含Leader和Follower),那在commit之前必须保证有N+1
个Replication复制完消息。
为了保证正确选出新的Leader,fail的Replica不能超过N个。因为在剩下的任意N+1个Replication里,至少有一个Replication包含有最新的所有消息。
这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。
Majority Vote
也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,
必须要有3个以上的Replication,如果要容忍2个Follower挂掉,必须要有5个以上的Replication。也就是说,在生产环境下为了保证较高的容错程度,
必须要有大量的Replication,而大量的Replication又会在大数据量下导致性能的急剧下降
。
这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。
例如HDFS的HA Feature是基于majority-vote-based journal
,但是它的数据存储并没有使用这种方式。
Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas)
,这个ISR里的所有Replication
都跟上了leader,只有ISR里的成员才有被选为Leader的可能。
在这种模式下,对于N+1个Replication,一个Partition能在保证不丢失已经commit的消息的前提下容忍N个Replication的失败。
在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍N个Replica的失败,Majority Vote
和ISR
在commit前需要等待的Replica数量是一样的,
但是ISR需要的总的Replication的个数几乎是Majority Vote的一半。
虽然Majority Vote
与ISR
相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,
并且节省下来的Replica和磁盘使得ISR模式仍然值得。
选举Leader
最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点, 而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
但是该方法会有3个问题:
1.split-brain
这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replication“看”到的状态是一样的,这就可能造成不同Replication的响应不一致
2.herd effect
如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
3.ZooKeeper负载过重,每个Replication都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。
Kafka 0.8.*的Leader Election
方案解决了上述问题,它在所有broker中选出一个controller
,所有Partition的Leader选举都由controller决定。
controller会将Leader的改变直接通过RPC的方式
(比ZooKeeper Queue的方式更高效)通知需为为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
如何处理所有Replication都不工作?
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replication
都宕机了,就无法保证数据不丢失了。
这种情况下有两种可行的方案:
1.等待ISR中的任一个Replication“活”过来,并且选它作为Leader
2.选择第一个“活”过来的Replication(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR
中的Replication“活”过来,那不可用的时间就可能会相对较长。
而且如果ISR中的所有Replication都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replication作为Leader,
而这个Replication不是ISR中的Replication,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源 (前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种, 从而根据不同的使用场景选择高可用性还是强一致性。
Kafka delivery guarantee
kafka消息传输有这么几种可能:
At most once 消息可能会丢,但绝不会重复传输
At least once 消息绝不会丢,但可能会重复传输
Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,
遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,
但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once
。
接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。
Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。 该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。
当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。
但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,
而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic
。
Kafka默认保证At least once
,并且允许通过设置Producer异步提交来实现At most once
。
而Exactly once要求与外部存储系统协作
,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。
Producer发布消息
写入方式
Producer 采用 push 模式将消息发布到 broker,每条消息都被 append
到 partition
中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。
消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
-
指定了 patition,则直接使用;
-
未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
-
patition 和 key 都未指定,使用轮询选出一个 patition。
写入流程
流程说明:
-
producer
先从 zookeeper 的 “/brokers/…/state” 节点找到该 partition 的 leader -
producer
将消息发送给该 leader -
leader
将消息写入本地 log -
followers
从 leader pull 消息,写入本地 log 后 leader 发送 ACK (另一种:followers获取到消息暂存在内存中时返回 ACK)
-
leader
收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
broker保存消息
存储方式
物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置), 每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件)。
存储策略
无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
-
基于时间:
log.retention.hours=168
-
基于大小:
log.retention.bytes=1073741824
Topic的创建和删除过程
创建topic
流程说明:
-
controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replication 分配。
-
controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:
-
从分配给该 partition 的所有 replication(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR
-
将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
-
-
controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。
删除topic
流程说明:
-
controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
-
若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。
broker failover
流程说明:
-
controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
-
controller 从 /brokers/ids 节点读取可用broker
-
controller决定set_p,该集合包含宕机 broker 上的所有 partition
-
对 set_p 中的每一个 partition
-
从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
-
决定新 leader
-
将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
-
-
通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令
controller failover
当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 “/controller” 节点注册 watcher, 当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path, 只有一个竞选成功并当选为 controller。
当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:
- 读取并增加
Controller Epoch
。 - 在
reassignedPartitions Patch(/admin/reassign_partitions)
上注册 watcher。 - 在
preferredReplicaElection Path(/admin/preferred_replica_election)
上注册 watcher。 - 通过
partitionStateMachine
在broker Topics Patch(/brokers/topics)
上注册 watcher。 - 若
delete.topic.enable=true
(默认值是 false),则partitionStateMachine
在Delete Topic Patch(/admin/delete_topics)
上注册 watcher。 - 通过
replicaStateMachine
在Broker Ids Patch(/brokers/ids)
上注册Watch。 - 初始化
ControllerContext
对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。 - 启动
replicaStateMachine
和partitionStateMachine
。 - 将
brokerState
状态设置为RunningAsController
。 - 将每个
partition
的Leadership
信息发送给所有“活”着的 broker。 - 若
auto.leader.rebalance.enable=true
(默认值是true),则启动partition-rebalance
线程。 - 若
delete.topic.enable=true
且Delete Topic Patch(/admin/delete_topics)
中有值,则删除相应的Topic。