Kafka消费者组重平衡

本文最后更新于 2024年6月25日 晚上

Kafka消费者组重平衡

重平衡的作用是让消费者组的消费者在消费哪些主题分区上达成共识。

重平衡要思考的问题

  • 重平衡的条件是什么?
  • 怎么知道达成了重平衡的条件?
  • 怎么通知消费者组开始重平衡?
  • 重平衡具体是怎么做的?
  • 重平衡真的是银弹吗?

重平衡条件

  • 消费者组订阅的主题数量发生变化。比如通过正则表达式订阅主题。
  • 消费者组订阅的主题的分区数量发生变化。比如手动新增了主题的分区。
  • 消费者组内的消费者数量发生变化。比如消费者消费者挂了,或者新增了消费者。

一旦发生了以上三种情况任意一种时,分区和消费者无法按照原先的策略继续消费,产生了所谓的消费不平衡的状态,所以此时需要重平衡。

通知和触发

当出现不平衡的情况时,Kafka是怎么感知和通知的消费者进行重平衡的呢?在消费者处于正常状态时,会定时发送心跳到运行在broker上的consumer group coordinator上。当有消费者挂掉时,心跳无法及时发送,coordinator可以感知到有消费者离组了,此时coordinator会将开始重平衡的信息装进消费者的心跳响应中,这样消费者就得到了重平衡的通知。

由此可以知道,消费者对于重平衡感知的及时性是由心跳的间隔决定的。可以通过heartbeat.interval.ms设置一个很短的心跳间隔让消费者更快地感应到重平衡。

最初消费者的心跳是消费者主线程发送的,也就是执行KafkaConsumer.poll()的线程发送的。这样出现了一个问题,消费者的消费逻辑也是在主线程里执行的,如果消费逻辑执行过慢了,心跳来不及发送,coordinator会认为消费者已经离线了,导致开启了不是预期的重平衡。所以在0.10.1.0版本中,消费者的心跳不再由主线程发送,让一个专门的线程执行心跳请求发送,来避免以上的问题。

消费者组的状态

kafka为消费组定义了五个状态。

状态 描述
Empty 消费者组内没有成员,不过还存在提交的消费者位移,这些消费位移还没有过期。
Dead 消费者组内没有成员,同时消费者组的元信息被coordinator移除。
PreparingRebalancing 消费者组准备重平衡,消费者需要重新加入消费组。
CompletingRebalancing 消费者已经加入消费者组,等待接收分配方案。
Stable 消费者组处于正常消费Kafka的状态。

重平衡的过程(核心)

重平衡开始前

  • 消费者组正常发送HeartbeatRequest请求,coordinator回应请求。

重平衡开始

  • 重平衡条件达成。
  • coordinator开始重平衡,通过心跳响应发送重平衡开始的信息到消费者。
  • 消费者接收到重平衡信息,开始重新加入消费者组。
  • 消费者发送JoinGroup请求
    • 第一个发送的消费者会被选举为leader,同时coordinator给leader返回的消息中包含消费组的消费者列表。Leader的职责是根据coordinator放回的信息来根据分配策略制定消费方案。
    • 其他消费者会收到本组leader的信息。
  • 消费者发送SyncGroup请求
    • Leader发送给coordinator的SyncGroup请求,包含了分区消费方案。coordinator会返回分区消费方案。
    • 其他消费者发送的SyncGroup请求不包含内容,coordinator还是同样返回分区消费方案。

重平衡结束

  • 消费者开始根据接收到的消费方案进行消费。

不管是JoinGroup还是SyncGroup,请求和返回的数据结构对于是不是leader其实是一样的,区别在内容上。

影响

降低TPS。重平衡期间,所有消费者都处于停止消费状态,导致消费者吞吐量降低。更严重的情况是,重平衡的时间可能会非常长,这个时候系统和入土没什么区别了。

可能的重复消费。重平衡开始时,可能消费者还没来得及提交消费位移,然后这个分区因为重平衡被其他消费者接管,在之前的消费位移上继续消费。

效率低下。 比如说,重平衡前后两个不同的消费者消费的分区是一样的,但是出现这种情况时,消费者还是要重新连接到新的broker上。

使用分配策略StickyAssignor时,会尽可能保持消费者可以消费原先的分区,但是目前存在一些bug,所以生产环境使用较少。

减少重平衡

要减少重平衡,就要减少重平衡条件达成的情况出现。

不管是对于主题数量的变化还是分区的变化,这些实际上都是由人为操作造成的,有时候因为一些因素不得不去操作主题或者分区来达到扩展或者其他目的。所以由主题和分区数量变化造成的重平衡是不可避免的。

在消费者数量方面,正常的消费者离组或者组内消费者数量增加也是不可避免的。比如有时候为了增大消费者组的吞吐量,我们可能会去用已有的GroupId来创建一些消费者,相当于是增加了消费组的成员数量,也会引发重平衡,这样的重平衡是可预见并且需要被合理评估和接受的。

接下来真正要控制的因素实际上只有一个了:消费者异常离组。存在以下情况。

  • 消费者无法及时发送心跳。此时Coordinator会认为消费者已经挂了,然后会将消费者移除并开启重平衡。
    • session.timeout.ms 控制Coordinator的心跳等待时长。
    • heartbeat.interval.ms 控制消费者发送心跳的间隔。
    • 建议的配置是设置 session.timeout.ms >= 3 * heartbeat.interval.ms。这样的逻辑是当Coordinator三次接收不到消费者心跳时,才认为消费者已经异常。
  • 消费者消费时间太长,两次拉取时间间隔太大。此时也是认为消费者挂了。
    • max.poll.interval.ms 控制最大拉取时长。
    • 建议是根据消费者的消费能力来选定时间。比如消费者要进行一个入库操作,这个入库操作设置的最大超时时间是5min(网络波动,或者异常重启等情况),那么建议max.poll.interval.ms 设置为6分钟,来保证消费正常进行。
  • GC导致出现以上两种情况。
    • 因为GC会造成STW,所以需要根据情况来修改JVM的启动参数。

参考


Kafka消费者组重平衡
https://blog.avezah.com/tech/20231125-kafka-rebalance/
作者
avezah
发布于
2023年11月25日
许可协议