Springboot集成Kafka rebalanced重复消费问题

5,237 阅读9分钟

问题描述

这几天在生产环境发现了kafka有重复消费的问题,通过日志可以发现经常出现rebalanced的日志告警:

WARN  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Synchronous auto-commit of offsets {am_performance_topic-0=OffsetAndMetadata{offset=27914, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Revoking previously assigned partitions [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer - partitions revoked: [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] (Re-)joining group
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Successfully joined group with generation 474
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=boot_kafka] Setting newly assigned partitions [am_performance_topic-0]
INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] o.s.k.l.KafkaMessageListenerCont

对于我这个kafka只懂点鸡毛蒜皮的弱鸡,唯一的办法就是网上各种搜。以下是我在解决问题中有帮助的博文:

Kafka session.timeout.ms heartbeat.interval.ms参数的区别以及对数据存储的一些思考
Kafka auto.offset.reset值详解
kafka之consumer参数auto.offset.reset
spring-kafka生产者消费者配置详解
谈谈Kafka Consumer Group的Coordinator与Rebalance机制

分析

我们来分析以下告警日志:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

可以获取到几个关键的参数max.poll.interval.msmax.poll.records以及session timeout。这段日志的简单翻译就是

无法完成提交,因为kafka Group已rebalanced并将分区分配给另一个成员。这通常意味着poll循环花费了太多时间处理消息,比配置的max.poll.interval.ms长。您可以通过增加会话超时时间或通过减小poll()中返回的最大批大小(max.poll.records)来解决此问题

我们先来了解以下这几个参数的意义,根据Kafka session.timeout.ms heartbeat.interval.ms参数的区别以及对数据存储的一些思考
因为一个topic往往有多个分区,而我们又会在一个consumer group里面创建多个消费者消费这个topic,因此:就有了一个问题:哪些的分区的消息交给哪个consumer消费呢?这里涉及到三个概念:consumer group,consumer group里面的consumer,以及每个consumer group有一个 group coordinator。conusmer分区分配是通过组管理协议来实施的;

consumer group里面的各个consumer都向 group coordinator发送JoinGroup请求,这样group coordinator就有了所有consumer的成员信息,于是它从中选出一个consumer作为Leader consumer,并告诉Leader consumer说:你拿着这些成员信息和我给你的topic分区信息去安排一下哪些consumer负责消费哪些分区吧

接下来,Leader consumer就根据我们配置的分配策略(由参数partition.assignment.strategy指定)为各个consumer计算好了各自待消费的分区。于是,各个consumer向 group coordinator 发送SyncGroup请求,但只有Leader consumer的请求中有分区分配策略,group coordinator 收到leader consumer的分区分配方案后,把该方案下发给各个consumer。画个图,就是下面这样的:

而在正常情况下 ,当有consumer进出consumer group时就会触发rebalance,所谓rebalance就是重新制订一个分区分配方案。而制订好了分区分配方案,就得及时告知各个consumer,这就与 heartbeat.interval.ms参数有关了。具体说来就是:每个consumer 都会根据 heartbeat.interval.ms 参数指定的时间周期性地向group coordinator发送 hearbeat,group coordinator会给各个consumer响应,若发生了 rebalance,各个consumer收到的响应中会包含 REBALANCE_IN_PROGRESS 标识,这样各个consumer就知道已经发生了rebalance,同时 group coordinator也知道了各个consumer的存活情况。

那为什么要把 heartbeat.interval.ms 与 session.timeout.ms 进行对比呢?session.timeout.ms是指:group coordinator检测consumer发生崩溃所需的时间。一个consumer group里面的某个consumer挂掉了,最长需要 session.timeout.ms 秒检测出来。举个示例session.timeout.ms=10,heartbeat.interval.ms=3

session.timeout.ms是个"逻辑"指标,它指定了一个阈值---10秒,在这个阈值内如果coordinator未收到consumer的任何消息,那coordinator就认为consumer挂了。而heartbeat.interval.ms是个"物理"指标,它告诉consumer要每3秒给coordinator发一个心跳包,heartbeat.interval.ms越小,发的心跳包越多,它是会影响发TCP包的数量的,产生了实际的影响,这也是我为什么将之称为"物理"指标的原因。

如果group coordinator在一个heartbeat.interval.ms周期内未收到consumer的心跳,就把该consumer移出group,这有点说不过去。就好像consumer犯了一个小错,就一棍子把它打死了。事实上,有可能网络延时,有可能consumer出现了一次长时间GC,影响了心跳包的到达,说不定下一个heartbeat就正常了。

而heartbeat.interval.ms肯定是要小于session.timeout.ms的,如果consumer group发生了rebalance,通过心跳包里面的REBALANCE_IN_PROGRESS,consumer就能及时知道发生了rebalance,从而更新consumer可消费的分区。而如果超过了session.timeout.ms,group coordinator都认为consumer挂了,那也当然不用把 rebalance信息告诉该consumer了。

在kafka0.10.1之后的版本中,将session.timeout.ms 和 max.poll.interval.ms 解耦了。也就是说:new KafkaConsumer对象后,在while true循环中执行consumer.poll拉取消息这个过程中,其实背后是有2个线程的,即一个kafka consumer实例包含2个线程:一个是heartbeat 线程,另一个是processing线程,processing线程可理解为调用consumer.poll方法执行消息处理逻辑的线程,而heartbeat线程是一个后台线程,对程序员是"隐藏不见"的。如果消息处理逻辑很复杂,比如说需要处理5min,那么 max.poll.interval.ms可设置成比5min大一点的值。而heartbeat 线程则和上面提到的参数 heartbeat.interval.ms有关,heartbeat线程 每隔heartbeat.interval.ms向coordinator发送一个心跳包,证明自己还活着。只要 heartbeat线程 在 session.timeout.ms 时间内 向 coordinator发送过心跳包,那么 group coordinator就认为当前的kafka consumer是活着的。

在kafka0.10.1之前,发送心跳包和消息处理逻辑这2个过程是耦合在一起的,试想:如果一条消息处理时长要5min,而session.timeout.ms=3000ms,那么等 kafka consumer处理完消息,group coordinator早就将consumer 移出group了,因为只有一个线程,在消息处理过程中就无法向group coordinator发送心跳包,超过3000ms未发送心跳包,group coordinator就将该consumer移出group了。而将二者分开,一个processing线程负责执行消息处理逻辑,一个heartbeat线程负责发送心跳包,那么:就算一条消息需要处理5min,只要底heartbeat线程在session.timeout.ms向group coordinator发送了心跳包,那consumer可以继续处理消息,而不用担心被移出group了。另一个好处是:如果consumer出了问题,那么在 session.timeout.ms内就能检测出来,而不用等到 max.poll.interval.ms 时长后才能检测出来。

小总结

所以在触发rebalanced重复消费时,有两种原因:

  • 一个是在session.timeout.ms时间内,没有收到心跳包,心跳线程每heartbeat.interval.ms时间间隔会发送一次心跳。
  • 另一个就是在max.poll.interval.ms时间内,kafka消费者上一次抓取的最多max.poll.records数量的消息没有处理完,没能在max.poll.interval.ms时间内返回更新offset的信息。

所以这一次的问题也很简单了,就是上述的第二种情况,解决方法就是增加max.poll.interval.ms超时时间,减少max.poll.records(每一次poll的消息量)。如果是使用spring-kafka框架的,这部分的参数如何写可以参考 Spring Boot Reference Guide以及上面博文中的spring-kafka生产者消费者配置详解。最后修改如下:

kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer:  org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      enable-auto-commit: true
      group-id: boot_kafka
      auto-commit-interval: 100
      auto-offset-reset: earliest
      max-poll-records: 50
      bootstrap-servers:  127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
      properties:
        max.poll.interval.ms: 1200000

max.poll.interval.ms默认值是300000(5分钟),max.poll.records默认值为500。更多参数指标描述参考Apache Kafka Documentation

再讲一下auto-offset-reset字段

auto.offset.reset值含义解释

  • earliest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  • latest
    当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  • none
    topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

测试过程:

  • earliest模式: kafka source的名称为a1

    1. 在a1中,topic为test1,groupId为0001,0001从未被消费过,数据(24条)提前发送,再启动sql1(select * from a1 ),会从头开始消费,显示24条数据

    2. 停掉1中所提到的sql1,发送不同的6条数据到kafka中,不更换a1的groupId,再启动sql1(select * from a1 ),会接着上次消费的位置开始往后消费,显示6条数据

  • latest模式:kafka source的名称为a2

    1. 在a2中,topic为b,groupId为0002,0002未被消费,数据提前发送,再启动sql2(select * from a2),在jmeter上未看到结果,在flink中查看相关metrics,无数据读入;在不杀掉sql2的前提下,发送一批(8条)数据,只消费后发送的8条数据。

    2. 停掉1中的sql2,不更换a2中的groupId,发送7条数据到b中,启动sql2,只显示后发送的7条数据

  • none模式: kafka source的名称为a3

    1. 在a3中,topic为c,设置groupId为0001(未被消费过),数据提前发送,再启动sql3(select * from a3),sql执行失败,在日志中报错。

    2. 在a3中,topic为c,设置groupId为0002(被消费过),启动sql3(select * from a3),发送8条数据到c中,jmeter中显示8条数据。