Kafka在实践中避免重复消费和丢失数据

By timebusker on June 14, 2019

Kafka保证数据不丢失的原理

消息的位置

维护其消息偏移量对于避免消息的重复消费与遗漏消费,确保消息的Exactly-once是至关重要的。kafka的消息所在的位置TopicPartitionsOffsets三个因素决定。 而Kafka消费者消费的消息位置还与consumergroup.id有关。

earlieastLeaderOffsets: 存储在broker上的leader节点的最早的消息偏移量 consumerOffsets: 消费者消费的消息偏移量位置

  • 情况一:earlieastLeaderOffsets偏移量小于consumerOffsets

情况一下,消费端偏移量consumerOffsets始终大于earlieastLeaderOffsets,可以正常消费。 但存储在broker上的kafka的消息常设置消息过期配置,当到达过期时间时过期的消息将会被清除。因此可能会产生另一种情况。

  • 情况二:earlieastLeaderOffsets偏移量大于consumerOffsets

此种情况会抛出kafka.common.OffsetOutOfRangeException异常。当情况三发生时,在(consumerOffsets,earlieastLeaderOffsets)区间内的消息还没有被消费就已经被清除了,将导致两个后果。

消息数据丢失 消费端抛出kafka.common.OffsetOutOfRangeException异常

在对消息完整性有严格要求的系统中,消息的丢失造成的影响会比较严重,所以在这种情况下,要保证消息不会丢失。

修改消息清理机制,尽量延长保留消息数据的时间周期 当消费者意外中断时,重新启动消费时能够从上一次中断的消息偏移量开始消费。

生产者数据的不丢失

kafka的ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。

  • 同步模式

ack机制能够保证数据的不丢失,如果ack设置为0,风险很大,一般不建议设置为0

# 消息发送失败重试次数
spring.kafka.producer.retries=3
# 消息接收确认机制 
spring.kafka.producer.acks=1
  • 异步模式

通过spring.kafka.producer.buffer-memory来进行控制数据的发送,有两个值来进行控制,时间阈值与消息的数量阈值,如果buffer满了数据还没有发送出去,且设置的是立即清理模式,风险很大,一定要设置为阻塞模式。

# 消息发送失败重试次数
spring.kafka.producer.retries=3
# 消息接收确认机制 
spring.kafka.producer.acks=1
# 数据批次提交设置要合理
spring.kafka.producer.batch-size=10
# 缓存区设置要合理
spring.kafka.producer.buffer-memory=1024

消费者数据的不丢失

通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,接着上次的offset进行消费即可。

kafka的offset的重置

spark读取kafka消息时,每次读取都会从kafka最新的offset读取。但是如果数据丢失,如果在使用Kafka来分发消息, 在数据处理的过程中可能会出现处理程序出异常或者是其它的错误,会造成数据丢失或不一致。这个时候你也许会想要通过kafka把数据从新处理一遍, 或者指定kafka的offset读取。kafka默认会在磁盘上保存到7天的数据,你只需要把kafka的某个topic的consumer的offset设置为某个值或者是最小值, 就可以使该consumer从你设置的那个点开始消费。这就需要从zk里面修改offset的值。

  • 查询topic的offset的范围
# 查看topic每个分区的offset的最小值
sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 12.12.12.6:9092 -topic imooc-access --time -2

# 查看topic每个分区的offset的最大值
sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 12.12.12.6:9092 -topic imooc-access --time -1
  • 设置consumer group的offset
# 连接zookeeper 
./bin/zkCli.sh -server 12.12.12.6:2181

# 查看所有的消费组信息
sh kafka-consumer-groups.sh --bootstrap-server 12.12.12.6:9092 --list

# 查看指定消费组的消费信息
sh kafka-consumer-groups.sh --bootstrap-server 12.12.12.6:9092 --group spark-etl --describe

(组)           (TOPIC)      (分区号)(消费组偏移量)(消息总偏移量) (消息剩余量:未消费)  
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
spark-etl       imooc-access    2          63              78              15              -               -               -
spark-etl       imooc-access    0          62              77              15              -               -               -
spark-etl       imooc-access    1          63              79              16              -               -               -

# 手动修改消息偏移量
sh kafka-consumer-groups.sh --bootstrap-server 12.12.21.6:9092 --group spark-etl --topic imooc-access --execute --reset-offsets --to-offset 80

  • 手动更新Kafka存在Zookeeper中的偏移量

我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。 Kafka内置为我们提供了修改偏移量的类:kafka.tools.UpdateOffsetsInZK,我们可以通过它修改Zookeeper中某个主题的偏移量,具体操作如下:

# 更新offset值(不可执行),只能把Zookeeper中偏移量设置成earliest或者latest
# 需要编辑consumer.properties配置文件
kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [earliest | latest] consumer.properties topic

Kafka重复消费原因

  • 原因1: 强行kill线程,导致消费后的数据,offset没有提交。

  • 原因2: 设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。

  • 原因3: 连接不上kafka。消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(默认是30秒),那么就会reblance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

设置合理的session时间
  • 原因4: 当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
重新分区需要消费完数据后删除topic重新建,避免消息被重复使用
  • 原因5: 消费处理数据慢。消费者每次poll的数据业务处理时间超过kafka的max.poll.interval.ms,默认是300秒,导致kafka的broker认为consumer挂掉,触发kafka进行rebalance动作,导致重新消费。
可适当延迟`max.poll.interval.ms`或者减小拉取消息的数量`max-poll-records`

Kafka Consumer丢失数据原因

  • 设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

  • 网络数据丢包

首先对kafka进行限速,其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all。