Kafka源码解读-Consumer重平衡

1,402 阅读14分钟

前言

本文是基于Kafka 2.7版本,参考《Apache Kafka源码剖析》这本书做的学习笔记

什么情况下会触发Rebalance操作

  1. 有新的消费者加入Consumer Group
  2. 有消费者主动退出Consumer Group
  3. 消费者宕机下线。例如遇到长时间的GC、网络延迟导致消费者长时间未向Group Coordinator发送HeartbeatRequest时,Group Coordinator会认为消费者下线,从而触发rebalance操作
  4. Consumer Group订阅的任一Topic出现分区数量的变化
  5. 消费者调用unsubscribe()取消对某Topic的订阅

这是《Apache Kafka源码剖析》上的描述,随着Kafka版本的迭代,这个表述已经不准确了,从kafka2.3版本起,消费者有静态成员的选项,可以避免consumer重启导致的两次rebalance问题,这就解决了第三个问题,当然,在新的版本中,什么情况下才会引起rebalance,我暂时也没完全搞明白,www.bilibili.com/read/cv1164… 这篇博客写得很好,值得再三阅读

什么是Group Coordinator

Coordinator是协调者的意思,每个Consumer Group都会选择一个broker作为自己的Group Coordinator,由Group Coordinator监控所有consumer的心跳,从而管理对应Consumer Group的rebalance,也就是说,这个角色是为协调rebalance而生

Rebalance过程

  1. 当前consumer准备加入Consumer Group或Group Coordinator发生故障转移时,consumer并不知道Group Coordinator的host和port,所以consumer会向Kafka集群中负载最小的Node节点发送FindCoordinatorRequest请求,收到请求的broker节点会返回FindCoordinatorResponse,其中就包含了负责管理该Consumer Group的Group Coordinator的地址
  2. 成功找到Consumer Group对应的Group Coordinator后,consumer会向Group Coordinator发送JoinGroupRequest请求,其中包含consumer的相关信息,Group Coordinator收到JoinGroupRequest后会暂存该consumer信息,等待全部consumer的JoinGroupRequest请求,JoinGroupRequest中的sessionTimeoutMs和rebalanceTimeoutMs就是用来告诉Group Coordinator等待多久的
  3. Group Coordinator会根据全部consumer的JoinGroupRequest请求来确定Consumer Group中可用的consumer,从中选取一个consumer成为leader,同时还会把分区策略和其他consumer的订阅信息发送给leader,每个consumer都会收到JoinGroupResponse响应,但是只有leader收到的JoinGroupResponse响应中封装了所有consumer信息以及leader信息,当其中一个consumer确定了自己的leader地位后,会根据consumer信息、Kafka集群元数据以及partition分配策略计算partition的分片结果,其它非leader consumer收到JoinGroupResponse为空响应,也就不会进行任何操作,只是原地等待
  4. 接下来,所有consumer进入Synchronizing Group State阶段,所有consumer会向Group Coordinator发送SyncGroupRequest,其中,leader consumer的SyncGroupRequest请求中包含了partition分配结果,普通consumer的SyncGroupRequest为空请求
  5. Group Coordinator会将partition分配结果封装成SyncGroupResponse返回给所有consumer
  6. consumer收到SyncGroupResponse后进行解析,就可以明确partition与consumer的映射关系
  7. 后续consumer会与Group Coordinator保持定期的心跳

如下图

image.png

ConsumerNetworkClient

ConsumerNetworkClient在NetworkClient之上进行了封装,主要负责与broker的网络交互

一些重要的成员变量
// ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>>
// 请求的缓冲队列,key是Node节点,value是发往此Node的ClientRequest队列
private final UnsentRequests unsent = new UnsentRequests();
// Kafka集群元数据
private final Metadata metadata;
// 重试退避时间
private final long retryBackoffMs;
// 超时时间,默认5000ms
private final int maxPollTimeoutMs;
// 请求超时时间,默认30000ms
private final int requestTimeoutMs;
// We do not need high throughput, so use a fair lock to try to avoid starvation
private final ReentrantLock lock = new ReentrantLock(true);
// 存储请求的的回调对象,当请求完成时,会将回调对象放入此队列,等待统一回调
private final ConcurrentLinkedQueue<RequestFutureCompletionHandler> pendingCompletion = new ConcurrentLinkedQueue<>();
一些重要的方法

unsent存储的是要发送但是还没有执行的网络请求,我们来看看它是在什么节点放进去的

public RequestFuture<ClientResponse> send(Node node,
                                              AbstractRequest.Builder<?> requestBuilder,
                                              int requestTimeoutMs) {
    long now = time.milliseconds();
    // 初始化回调对象
    RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
    // 构造请求对象
    ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true,
                                                          requestTimeoutMs, completionHandler);
    // 把请求放进缓存队列中
    unsent.put(node, clientRequest);
​
    // wakeup the client in case it is blocking in poll so that we can send the queued request
    // 唤醒请求客户端
    client.wakeup();
    return completionHandler.future;
}

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll()方法有多个重载,最终会调用poll(Timer timer, PollCondition pollCondition, boolean disableWakeup),timer用于判断网络请求的阻塞时间,pollCondition用于判断是否进行阻塞

public void poll(Timer timer, PollCondition pollCondition, boolean disableWakeup) {
    // 处理回调方法
    firePendingCompletedRequests();
​
    lock.lock();
    try {
        handlePendingDisconnects();
​
        // 尝试发送可以发送的请求
        long pollDelayMs = trySend(timer.currentTimeMs());
​
        // 根据pollCondition条件判断是否需要阻塞直至请求成功
        // 比如消费者拉取消息时发现缓存中已经没有缓存的消息,这时候就需要阻塞发送到服务端的请求知道请求成功
        if (pendingCompletion.isEmpty() && (pollCondition == null || pollCondition.shouldBlock())) {
            long pollTimeout = Math.min(timer.remainingMs(), pollDelayMs);
            if (client.inFlightRequestCount() == 0)
                pollTimeout = Math.min(pollTimeout, retryBackoffMs);
            client.poll(pollTimeout, timer.currentTimeMs());
        } else {
            client.poll(0, timer.currentTimeMs());
        }
        timer.update();
​
        ......
    } finally {
        lock.unlock();
    }
​
    // called without the lock to avoid deadlock potential if handlers need to acquire locks
    firePendingCompletedRequests();
​
    metadata.maybeThrowAnyException();
}

ConsumerNetworkClient.trySend()方法循环处理unsent中缓存的请求,具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready()方法检测consumer与此节点之间的连接,若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队列中等待响应

long trySend(long now) {
    long pollDelayMs = maxPollTimeoutMs;
​
    // send any requests that can be sent now
    // 按节点从unsent集合中获取ClientRequest
    for (Node node : unsent.nodes()) {
      Iterator<ClientRequest> iterator = unsent.requestIterator(node);
      if (iterator.hasNext())
        // 计算超时时间,这个超时时间会作为Selector最长阻塞时间
        pollDelayMs = Math.min(pollDelayMs, client.pollDelayMs(node, now));
​
      while (iterator.hasNext()) {
        ClientRequest request = iterator.next();
        // 调用NetworkClient.ready()检测是否可以发送请求
        if (client.ready(node, now)) {
          // 调用NetworkClient.send(),等待发送请求
          client.send(request, now);
          // 删除队列中对应请求
          iterator.remove();
        } else {
          break;
        }
      }
    }
    // 返回超时时间
    return pollDelayMs;
}

RequestFutureAdapter

ConsumerNetworkClient负责最底层的网络发送,那么这些网络请求的Request跟Response是怎么处理的呢?不得不说RequestFutureAdapter这个类

image.png RequestFutureAdapter是适配器的抽象类,定义了onSuccess()和onFailure(),如上图,它的实现类有HeartbeatResponseHandler、JoinGroupResponseHandler、LeaveGroupResponseHandler、SyncGroupResponseHandler等,看名字就知道是用来处理FindCoordinatorRequest、JoinGroupRequest等请求的,下面我们看下FindCoordinatorResponseHandler的代码来感受一下查找Group Coordinator的Response是怎么处理的

private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
​
    @Override
    public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Received FindCoordinator response {}", resp);
        // 请求Group Coordinator节点成功,服务端的回应
        FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
        Errors error = findCoordinatorResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                // 使用Integer.MAX_VALUE - node.id作为协调器id以允许单独的连接
                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
                // 构建Group Coordinator节点的Node对象
                AbstractCoordinator.this.coordinator = new Node(
                  coordinatorConnectionId,
                  findCoordinatorResponse.data().host(),
                  findCoordinatorResponse.data().port());
                log.info("Discovered group coordinator {}", coordinator);
                // 尝试与Group Coordinator节点建立连接
                client.tryConnect(coordinator);
                heartbeat.resetSessionTimeout();
            }
            // 响应传播
            future.complete(null);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
        } else {
            log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
            future.raise(error);
        }
    }
​
    @Override
    public void onFailure(RuntimeException e, RequestFuture<Void> future) {
       ......
    }
}

那为什么要定义这个RequestFutureAdapter呢?

定义某个规范,比如抽象类、接口,一般都是因为类似的场景很多,定义规范便于管理,上文讲过,consumer与broker的交互是很多的,每次交互都有response,不同的response处理方式是不同的,通过继承RequestFutureAdapter可以做到每个实现类单独处理某个response,减少大量的if else,那这个RequestFutureAdapter是怎么使用的呢

断点在FindCoordinatorResponseHandler类上,找到整条调用链

image.png

调用链如下

org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#poll

org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient#firePendingCompletedRequests

org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler#fireCompletion

org.apache.kafka.clients.consumer.internals.RequestFuture#complete

org.apache.kafka.clients.consumer.internals.RequestFuture#fireSuccess

org.apache.kafka.clients.consumer.internals.RequestFutureListener#onSuccess

看图比较形象

image.png

// 上文说到,RequestFutureAdapter是用于跟broker的交互处理回调使用的
// 在发送请求给broker前,会先放一个监听器在队列中
public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
    final RequestFuture<S> adapted = new RequestFuture<>();
    addListener(new RequestFutureListener<T>() {
        @Override
        public void onSuccess(T value) {
          adapter.onSuccess(value, adapted);
        }
​
        @Override
        public void onFailure(RuntimeException e) {
          adapter.onFailure(e, adapted);
        }
    });
    return adapted;
}
​
​
// 请求回来之后会调用这个接口
public void fireCompletion() {
    // 如果有异常,走future.raise()
    if (e != null) {
      future.raise(e);
    } else if (response.authenticationException() != null) {
      future.raise(response.authenticationException());
    } else if (response.wasDisconnected()) {
      log.debug("Cancelled request with header {} due to node {} being disconnected",
                response.requestHeader(), response.destination());
      future.raise(DisconnectException.INSTANCE);
    } else if (response.versionMismatch() != null) {
      future.raise(response.versionMismatch());
    } else {
      // 如果无异常,走future.complete()
      future.complete(response);
    }
}
​
// 只看无异常的逻辑,有异常的逻辑是相似
// 无异常会调用这个方法
public void complete(T value) {
    try {
        if (value instanceof RuntimeException)
            throw new IllegalArgumentException("The argument to complete can not be an instance of RuntimeException");
​
        if (!result.compareAndSet(INCOMPLETE_SENTINEL, value))
            throw new IllegalStateException("Invalid attempt to complete a request future which is already complete");
        // 主要是调用这个方法
        fireSuccess();
    } finally {
        completedLatch.countDown();
    }
}
​
​
private void fireSuccess() {
    T value = value();
    while (true) {
      // 这里拿出发送请求前放在队列中的监听器
      RequestFutureListener<T> listener = listeners.poll();
      if (listener == null)
        break;
      // 此处回调监听器
      listener.onSuccess(value);
    }
}

RequestFuture

RequestFuture 是一个泛型类,其核心方法如下

// RequestFutureListener集合,用来监听请求完成的情况RequestFutureListener, 接口有onSuccess和 
// onFailure()两个方法,对应于请求正常完成和出现异常两种情况
private final ConcurrentLinkedQueue<RequestFutureListener<T>> listeners = new ConcurrentLinkedQueue<>();
​
// 表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为 true
public boolean isDone() {
   return result.get() != INCOMPLETE_SENTINEL;
}
// 记录请求正常完成时收到的响应,与 exception 字段互斥。此字段非空表示正常完成,反之表示出现异常。 
public T value() {
    if (!succeeded())
      throw new IllegalStateException("Attempt to retrieve value from future which hasn't successfully completed");
    return (T) result.get();
}
​
// 记录导致请求异常完成的异常类,与 value 字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
public RuntimeException exception() {
  if (!failed())
    throw new IllegalStateException("Attempt to retrieve exception from future which hasn't failed");
  return (RuntimeException) result.get();
}
​
​

Rebalance代码

ConsumerCoordinator

上文讲了rebalance的原理,在KafkaConsumer中通过ConsumerCoordinator组件实现与服务端Group Coordinator的交互,ConsumerCoordinator继承了抽象类AbstractCoordinator,下面我们先来介绍AbstractCoordinator的重要成员变量

public abstract class AbstractCoordinator implements Closeable {
    protected enum MemberState {
        // 消费者还没加入到服务端Group Coordinator中
        UNJOINED,            
        // 消费者发出了加入Group Coordinator的请求,但是还没收到响应
        PREPARING_REBALANCE,  
        // 消费者收到了加入Group Coordinator的响应
        COMPLETING_REBALANCE, 
        // 消费者发出了加入Group Coordinator的请求,并正在向Group Coordinator发送心跳
        STABLE;              
​
        public boolean hasNotJoinedGroup() {
            return equals(UNJOINED) || equals(PREPARING_REBALANCE);
        }
    }
    // 心跳任务的辅助类
    private final Heartbeat heartbeat;
    // Node的类对象,保存着Group Coordinator的节点信息
    private Node coordinator = null;
    // 是否重新发送加入Group Coordinator的请求
    private boolean rejoinNeeded = true;
    private boolean needsJoinPrepare = true;
    // 维持心跳的线程,加入到Group Coordinator后,会发起定时的心跳线程
    private HeartbeatThread heartbeatThread = null;
    // 加入Group Coordinator的异步请求类对象
    private RequestFuture<ByteBuffer> joinFuture = null;
    // consumer查找Group Coordinator的异步请求类对象
    private RequestFuture<Void> findCoordinatorFuture = null;
    private volatile RuntimeException fatalFindCoordinatorException = null;
    // Group Coordinator发过来的Rebalance年代,用来区分两次rebalance操作,由于网络延迟等问题
    // 在执行rebalance操作时可能收到上次rebalance过程的请求,避免这种干扰,每次rebalance操作都会递增generation的值
    private Generation generation = Generation.NO_GENERATION;
    // 最后一次rebalance的开始时间
    private long lastRebalanceStartMs = -1L;
    // 最后一次rebalance的结束时间
    private long lastRebalanceEndMs = -1L;
}

再看看ConsumerCoordinator的重要成员变量

public final class ConsumerCoordinator extends AbstractCoordinator {
    private final GroupRebalanceConfig rebalanceConfig;
    private final Logger log;
    // 订阅分区任务列表,列表的元素是ConsumerPartitionAssignor,ConsumerPartitionAssignor里是发送
    // JoinGroupRequest请求中消费者支持的分区算法等信息,Group Coordinator会从所有消费者都支持的算法中选择一个,
    // 并通知leader consumer使用这个分区算法进行分配,一个消费者可以包含多个分区算法,可以在      
    // partition.assignment.strategy参数中配置
    private final List<ConsumerPartitionAssignor> assignors;
    // 消费者元数据
    private final ConsumerMetadata metadata;
    private final ConsumerCoordinatorMetrics sensors;
    // 订阅状态,保存了主题分区和offset的对应关系
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    // 是否自动提交offset
    private final boolean autoCommitEnabled;
    // 提交offset的时间间隔
    private final int autoCommitIntervalMs;
    private final ConsumerInterceptors<?, ?> interceptors;
    private final AtomicInteger pendingAsyncCommits;
​
    // this collection must be thread-safe because it is modified from the response handler
    // of offset commit requests, which may be invoked from the heartbeat thread
    // 提交offset的任务列表,队列的元素是提交offset后处理响应的回调对象,OffsetCommitCompletion是发出
    // 提交offset的请求后,会把处理响应的回调对象放到这个队列里
    private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;
    // 是否leader
    private boolean isLeader = false;
    private Set<String> joinedSubscription;
    // 保存分区信息,用来监控分区信息是否变了
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;
    private Timer nextAutoCommitTimer;
    private AtomicBoolean asyncCommitFenced;
    // 消费者元数据
    private ConsumerGroupMetadata groupMetadata;
    private final boolean throwOnFetchStableOffsetsUnsupported;
}
第一阶段

rebalance操作的第一步就是查找Group Coordinator,这个阶段consumer会向Kafka集群中的任意一个broker发送FindCoordinatorRequest,并使用FindCoordinatorResponseHandler处理返回的response,发送请求的入口在org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady

protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
    // 判断是否与Group Coordinator建立连接
    if (!coordinatorUnknown())
      return true;
​
    do {
      if (fatalFindCoordinatorException != null) {
        final RuntimeException fatalException = fatalFindCoordinatorException;
        fatalFindCoordinatorException = null;
        throw fatalException;
      }
      // 如果未建立连接,则开始查找Group Coordinator的预处理
      final RequestFuture<Void> future = lookupCoordinator();
      // 不断请求集群中任一节点直到返回Group Coordinator的信息
      client.poll(future, timer);
​
      // 超时则退出循环
      if (!future.isDone()) {
        // ran out of time
        break;
      }
​
      RuntimeException fatalException = null;
      // 异常处理
      if (future.failed()) {
        if (future.isRetriable()) {
          log.debug("Coordinator discovery failed, refreshing metadata", future.exception());
          // 如果出现RetriableException,调用awaitMetadataUpdate()方法阻塞更新Metadata中记录的集群元数据
          // 成功后重新请求
          client.awaitMetadataUpdate(timer);
        } else {
          fatalException = future.exception();
          log.info("FindCoordinator request hit fatal exception", fatalException);
        }
        // 成功找到Group Coordinator节点,但是网络连接失败
      } else if (coordinator != null && client.isUnavailable(coordinator)) {
        // 将Group Coordinator置为null,退避一段时间后重新查找
        markCoordinatorUnknown("coordinator unavailable");
        timer.sleep(rebalanceConfig.retryBackoffMs);
      }
      // 将异步请求类对象置为空,等待下一次请求 
      clearFindCoordinatorFuture();
      if (fatalException != null)
        throw fatalException;
      // 不断尝试获取Group Coordinator的信息,直到成功或者超时
    } while (coordinatorUnknown() && timer.notExpired());
​
    return !coordinatorUnknown();
}

下面来看看lookupCoordinator实现

protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
      // find a node to ask about the coordinator
      // 找到负载最小的node节点
      Node node = this.client.leastLoadedNode();
      if (node == null) {
        log.debug("No broker available to send FindCoordinator request");
        return RequestFuture.noBrokersAvailable();
      } else {
        // 构建请求并放进缓存队列中
        findCoordinatorFuture = sendFindCoordinatorRequest(node);
      }
    }
    return findCoordinatorFuture;
}
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
    // initiate the group metadata request
    log.debug("Sending FindCoordinator request to broker {}", node);
    // 构建查找Group Coordinator节点的请求
    FindCoordinatorRequest.Builder requestBuilder =
      new FindCoordinatorRequest.Builder(
      new FindCoordinatorRequestData()
      .setKeyType(CoordinatorType.GROUP.id())
      .setKey(this.rebalanceConfig.groupId));
    // 发送请求,并用FindCoordinatorResponseHandler类对象来处理响应
    return client.send(node, requestBuilder)
      .compose(new FindCoordinatorResponseHandler());
}

看看第一阶段的请求报文

image.png

FindCoordinatorResponseHandler类是用来处理response的

先看看response的报文,很明显可以看到broker所在的Node节点的host跟port

image.png

private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
​
    @Override
    public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
        log.debug("Received FindCoordinator response {}", resp);
        // 请求Group Coordinator节点成功,服务端的回应
        FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
        Errors error = findCoordinatorResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                // 使用Integer.MAX_VALUE - node.id作为协调器id以允许单独的连接
                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
                // 构建Group Coordinator节点的Node对象
                AbstractCoordinator.this.coordinator = new Node(
                  coordinatorConnectionId,
                  findCoordinatorResponse.data().host(),
                  findCoordinatorResponse.data().port());
                log.info("Discovered group coordinator {}", coordinator);
                // 尝试与Group Coordinator节点建立连接
                client.tryConnect(coordinator);
                heartbeat.resetSessionTimeout();
            }
            // 响应传播
            future.complete(null);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
        } else {
            log.debug("Group coordinator lookup failed: {}", findCoordinatorResponse.data().errorMessage());
            future.raise(error);
        }
    }
​
    @Override
    public void onFailure(RuntimeException e, RequestFuture<Void> future) {
       ......
    }
}
第二阶段

在成功查找到对应的Group Coordinator之后进入Join Group阶段,在此阶段,consumer会向Group Coordinator发送JoinGroupRequest请求,并处理响应

boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
      // 常规套路,再次检查是否已与Group Coordinator建立连接
      if (!ensureCoordinatorReady(timer)) {
        return false;
      }
      
      // needsJoinPrepare 这个标志位是为了不二次调用onJoinPrepare()这个方法
      if (needsJoinPrepare) {
        // 如果rebalance失败,下次进行rebalance不需要走进这个逻辑
        needsJoinPrepare = false;
        // 提交offset 和 调用用户自定义的ConsumerRebalanceListener,有些场景用户想要在Rebalance之前做一些事情
        // 比如保存消费的offset,避免重复消费
        onJoinPrepare(generation.generationId, generation.memberId);
      }
      // 初始化joinGroup请求,并发送该请求
      final RequestFuture<ByteBuffer> future = initiateJoinGroup();
      client.poll(future, timer);
      if (!future.isDone()) {
        // we ran out of time
        return false;
      }
      // 如果异步请求成功了
      if (future.succeeded()) {
        ......
      }
    }
    return true;
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        
    if (joinFuture == null) {
      // 设置consumer的状态为PREPARING_REBALANCE,表示开始重平衡了
      state = MemberState.PREPARING_REBALANCE;
      // 如果前一个rebalance失败,可以连续触发下一个rebalance
      // 在这种情况下,我们不会更新开始时间
      if (lastRebalanceStartMs == -1L)
        lastRebalanceStartMs = time.milliseconds();
      // 发送JoinGroupRequest
      joinFuture = sendJoinGroupRequest();
      // 为joinFuture加一个回调的监听器
      joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
        @Override
        public void onSuccess(ByteBuffer value) {
          // 成功不会做任何事,因为逻辑都在JoinGroupRequest的回调对象JoinGroupResponseHandler里
        }
​
        @Override
        public void onFailure(RuntimeException e) {
          // we handle failures below after the request finishes. if the join completes
          // after having been woken up, the exception is ignored and we will rejoin;
          // this can be triggered when either join or sync request failed
          synchronized (AbstractCoordinator.this) {
            sensors.failedRebalanceSensor.record();
          }
        }
      });
    }
    return joinFuture;
}
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
    if (coordinatorUnknown())
      return RequestFuture.coordinatorNotAvailable();
​
    // send a join group request to the coordinator
    log.info("(Re-)joining group");
    // 构建加入Group Coordinator的请求
    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
      new JoinGroupRequestData()
      // group.id配置
      .setGroupId(rebalanceConfig.groupId)
      // 消费者定期发送心跳证明自己的存活,如果在这个时间之内broker没收到,那broker就将此消费者从group中移除
      // 需要注意的是,这个值必须在broker属性group.min.session.timeout.ms和group.max.session.timeout.ms的值中间
      .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
      .setMemberId(this.generation.memberId)
      // 用户提供的消费者实例的唯一标识符,如果设置了,消费者则被视为静态成员,静态成员配以较大的session超时设置能够避免
      // 因成员临时不可用(比如重启)而引起的rebalance,如果不设置,消费者将作为动态成员
      .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
      // consumer发起的协议,为consumer
      .setProtocolType(protocolType())
      // 分区分配策略和对应的订阅元信息
      .setProtocols(metadata())
      // 默认为300000(5分钟),如果消费者两次poll的时间超过了此值,那就认为消费者能力不足,会触发rebalance,将该消费者
      // 消费的分区分配给它人
      .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
    );
​
    log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);
​
    // Note that we override the request timeout using the rebalance timeout since that is the
    // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
    int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
                                      rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
    // 发送请求,并设置处理返回值的对象
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
      .compose(new JoinGroupResponseHandler(generation));
 }

看看发送的报文长什么样子 image.png

看看response长什么样子

image.png

private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    private JoinGroupResponseHandler(final Generation generation) {
      super(generation);
    }
​
    @Override
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
      Errors error = joinResponse.error();
      if (error == Errors.NONE) {
        // 协议类型不一致
        if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
          log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
                    joinResponse.data().protocolType(), protocolType());
          future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
        } else {
          log.debug("Received successful JoinGroup response: {}", joinResponse);
          sensors.joinSensor.record(response.requestLatencyMs());
​
          synchronized (AbstractCoordinator.this) {
            if (state != MemberState.PREPARING_REBALANCE) {
              future.raise(new UnjoinedGroupException());
            } else {
              // 更新状态为COMPLETING_REBALANCE
              state = MemberState.COMPLETING_REBALANCE;
​
              // 启动心跳线程
              if (heartbeatThread != null)
                heartbeatThread.enable();
​
              AbstractCoordinator.this.generation = new Generation(
                joinResponse.data().generationId(),
                joinResponse.data().memberId(), joinResponse.data().protocolName());
​
              log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);
              // 判断是否leader节点
              if (joinResponse.isLeader()) {
                // 如果是leader节点,需要计算一下新的分配方案,再发送给Group Coordinator
                onJoinLeader(joinResponse).chain(future);
              } else {
                onJoinFollower().chain(future);
              }
            }
          }
        }
      } 
      ......
  }

主要看一下leader节点是怎么操作的

private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // perform the leader synchronization and send back the assignment for the group
        // 基于响应返回的分区策略和消费者里所有的分区信息,计算出每个消费者分配的分区
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                                                                    joinResponse.data().members());
​
        List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
        for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
          groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
                                  .setMemberId(assignment.getKey())
                                  .setAssignment(Utils.toArray(assignment.getValue()))
                                 );
        }
        // 构建发送分区消费方案的请求
        SyncGroupRequest.Builder requestBuilder =
          new SyncGroupRequest.Builder(
          new SyncGroupRequestData()
          .setGroupId(rebalanceConfig.groupId)
          .setMemberId(generation.memberId)
          .setProtocolType(protocolType())
          .setProtocolName(generation.protocolName)
          .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
          .setGenerationId(generation.generationId)
          .setAssignments(groupAssignmentList)
        );
        log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}", this.coordinator, this.generation, requestBuilder);
        // 发送分区消费方案的请求
        return sendSyncGroupRequest(requestBuilder);
      } catch (RuntimeException e) {
        return RequestFuture.failure(e);
      }
}
private RequestFuture<ByteBuffer> sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
    if (coordinatorUnknown())
      return RequestFuture.coordinatorNotAvailable();
    return client.send(coordinator, requestBuilder)
      .compose(new SyncGroupResponseHandler(generation));
}

看看第三阶段的报文长什么样子

image.png

第三阶段

完成分区分配之后就进入了Synchronizing Group State阶段,主要逻辑是向Group Coordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应

看看报文长什么样子

image.png

public void handle(SyncGroupResponse syncResponse,
                           RequestFuture<ByteBuffer> future) {
  Errors error = syncResponse.error();
  // 如果响应没有异常
  if (error == Errors.NONE) {
    ......
​
      synchronized (AbstractCoordinator.this) {
        if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
          ......
          } else {
            log.info("Successfully synced group in generation {}", generation);
            // 修改rebalance状态
            state = MemberState.STABLE;
            rejoinNeeded = false;
            // 记录rebalance完成时间
            lastRebalanceEndMs = time.milliseconds();
            sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
            lastRebalanceStartMs = -1L;
            // 传播rebalance完成事件
            future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
          }
        } else {
          log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
                   "receiving SyncGroup response, marking this rebalance as failed and retry",
                   generation, state);
          // use ILLEGAL_GENERATION error code to let it retry immediately
          future.raise(Errors.ILLEGAL_GENERATION);
        }
      }
    }
  } else {
    ......
  }

处理完SyncGroupResponse后会调用下面这个方法设置订阅的分区

protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
  log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);
​
  // 只有leader consumer才关心元数据的变化
  if (!isLeader)
    assignmentSnapshot = null;
  // 查找使用的分区策略
  ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
  if (assignor == null)
    throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
​
  // Give the assignor a chance to update internal state based on the received assignment
  groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
​
  Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
​
  
  // 反序列化,更新assignment
  Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
  // 获取分配的分区
  Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
​
  ......
​
  final AtomicReference<Exception> firstException = new AtomicReference<>(null);
  Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
  addedPartitions.removeAll(ownedPartitions);
​
  ......
  // 设置订阅的分区
  subscriptions.assignFromSubscribed(assignedPartitions);
​
  ......
}

至此,消费者便可以开始消费数据了

参考资料

xie.infoq.cn/article/172…

zhuanlan.zhihu.com/p/442486261