Kafka保证数据不丢失的原理
消息的位置
维护其消息偏移量对于避免消息的重复消费与遗漏消费,确保消息的Exactly-once
是至关重要的。kafka的消息所在的位置Topic
、Partitions
、Offsets
三个因素决定。
而Kafka消费者消费的消息位置还与consumer
的group.id
有关。
earlieastLeaderOffsets
: 存储在broker
上的leader
节点的最早的消息偏移量consumerOffsets
: 消费者消费的消息偏移量位置
- 情况一:earlieastLeaderOffsets偏移量小于consumerOffsets
情况一下,消费端偏移量consumerOffsets始终大于earlieastLeaderOffsets,可以正常消费。 但存储在broker上的kafka的消息常设置消息过期配置,当到达过期时间时过期的消息将会被清除。因此可能会产生另一种情况。
- 情况二:earlieastLeaderOffsets偏移量大于consumerOffsets
此种情况会抛出kafka.common.OffsetOutOfRangeException
异常。当情况三发生时,在(consumerOffsets,earlieastLeaderOffsets)
区间内的消息还没有被消费就已经被清除了,将导致两个后果。
消息数据丢失 消费端抛出
kafka.common.OffsetOutOfRangeException
异常
在对消息完整性有严格要求的系统中,消息的丢失造成的影响会比较严重,所以在这种情况下,要保证消息不会丢失。
修改消息清理机制,尽量延长保留消息数据的时间周期 当消费者意外中断时,重新启动消费时能够从上一次中断的消息偏移量开始消费。
生产者数据的不丢失
kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。
- 同步模式
ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0
# 消息发送失败重试次数
spring.kafka.producer.retries=3
# 消息接收确认机制
spring.kafka.producer.acks=1
- 异步模式
通过spring.kafka.producer.buffer-memory
来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,且设置的是立即清理模式,风险很大,一定要设置为阻塞模式。
# 消息发送失败重试次数
spring.kafka.producer.retries=3
# 消息接收确认机制
spring.kafka.producer.acks=1
# 数据批次提交设置要合理
spring.kafka.producer.batch-size=10
# 缓存区设置要合理
spring.kafka.producer.buffer-memory=1024
消费者数据的不丢失
通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,接着上次的offset进行消费即可。
kafka的offset的重置
spark读取kafka消息时,每次读取都会从kafka最新的offset读取。但是如果数据丢失,如果在使用Kafka来分发消息, 在数据处理的过程中可能会出现处理程序出异常或者是其它的错误,会造成数据丢失或不一致。这个时候你也许会想要通过kafka把数据从新处理一遍, 或者指定kafka的offset读取。kafka默认会在磁盘上保存到7天的数据,你只需要把kafka的某个topic的consumer的offset设置为某个值或者是最小值, 就可以使该consumer从你设置的那个点开始消费。这就需要从zk里面修改offset的值。
- 查询topic的offset的范围
# 查看topic每个分区的offset的最小值
sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 12.12.12.6:9092 -topic imooc-access --time -2
# 查看topic每个分区的offset的最大值
sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 12.12.12.6:9092 -topic imooc-access --time -1
- 设置consumer group的offset
# 连接zookeeper
./bin/zkCli.sh -server 12.12.12.6:2181
# 查看所有的消费组信息
sh kafka-consumer-groups.sh --bootstrap-server 12.12.12.6:9092 --list
# 查看指定消费组的消费信息
sh kafka-consumer-groups.sh --bootstrap-server 12.12.12.6:9092 --group spark-etl --describe
(组) (TOPIC) (分区号)(消费组偏移量)(消息总偏移量) (消息剩余量:未消费)
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
spark-etl imooc-access 2 63 78 15 - - -
spark-etl imooc-access 0 62 77 15 - - -
spark-etl imooc-access 1 63 79 16 - - -
# 手动修改消息偏移量
sh kafka-consumer-groups.sh --bootstrap-server 12.12.21.6:9092 --group spark-etl --topic imooc-access --execute --reset-offsets --to-offset 80
- 手动更新Kafka存在Zookeeper中的偏移量
我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper
中的数据了。
Kafka内
置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK
,我们可以通过它修改Zookeeper
中某个主题的偏移量,具体操作如下:
# 更新offset值(不可执行),只能把Zookeeper中偏移量设置成earliest或者latest
# 需要编辑consumer.properties配置文件
kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [earliest | latest] consumer.properties topic
Kafka重复消费原因
-
原因1:
强行kill线程
,导致消费后的数据,offset没有提交。 -
原因2: 设置offset为自动提交,
关闭kafka时
,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。 -
原因3:
连接不上kafka
。消费后的数据,当offset还没有提交时,partition就断开连接
。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout
时间(默认是30秒),那么就会reblance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
设置合理的session时间
- 原因4: 当消费者
重新分配partition
的时候,可能出现从头开始消费的情况,导致重发问题。
重新分区需要消费完数据后删除topic重新建,避免消息被重复使用
- 原因5:
消费处理数据慢
。消费者每次poll的数据业务处理时间超过kafka的max.poll.interval.ms,默认是300秒,导致kafka的broker认为consumer挂掉,触发kafka进行rebalance动作,导致重新消费。
可适当延迟`max.poll.interval.ms`或者减小拉取消息的数量`max-poll-records`
Kafka Consumer丢失数据原因
-
设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
-
网络数据丢包
首先对kafka进行限速,其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all。