在解释Kafka重复消费出现原因之前,列举一下Kafka中与消费者有关的几个重要配置参数。

  • enable.auto.commit:表示消费者会周期性自动提交消费的offset。默认值true。

  • auto.commit.interval.ms:在enable.auto.commit为true的情况下, 自动提交的间隔。默认值5秒。

  • max.poll.records:单次消费者拉取的最大数据条数,默认值500。

  • max.poll.interval.ms:表示若在阈值时间之内消费者没有消费完上一次poll的消息,consumer client会主动向coordinator发起LeaveGroup请求,触发Rebalance;然后consumer重新发送JoinGroup请求。

  • session.timeout.ms:group Coordinator检测consumer发生崩溃所需的时间。在这个时间内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。默认值10秒。

  • heartbeat.interval.ms:标识Consumer给Coordinator发一个心跳包的时间间隔。heartbeat.interval.ms越小,发的心跳包越多。默认值3秒。

Group Coordinator

基于Zookeeper的Rebalance存在不可避免的羊群效应和脑裂问题,如何不用Zookeeper来协调,而是将失败探测和Rebalance的逻辑放到一个高可用的中心,那么上述问题就能得以解决。因此Kafka0.9的版本重新设计了Consumer端,设计了Coordinator机制,大大减少了Zookeeper负载。

对于每一个Consumer Group,Kafka集群为其从Broker集群中选择一个Broker作为其Coordinator。Coordinator主要做两件事:

  1. 维持Group成员的组成。这包括加入新的成员,检测成员的存活性,清除不再存活的成员。

  2. 协调Group成员的行为。

羊群效应:以Zookeeper为例,由于一个被watch的znode变化,导致大量的通知需要被发送,将会导致在这个通知期间的其他操作提交的延迟。

重复消费的原因

原因1:消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。

原因2:设置enable.auto.commit为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。

原因3:消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。

重复消费的解决方法

  1. 提高消费者的处理速度。例如:对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的同时,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records的值。

  2. 引入消息去重机制。例如:生成消息时,在消息中加入唯一标识符如消息id等。在消费端,可以保存最近的max.poll.records条消息id到redis或mysql表中,这样在消费消息时先通过查询去重后,再进行消息的处理。

  3. 保证消费者逻辑幂等。

最后修改于
上一篇