kafka一个主题的一个分区在同一个group中只能被一个消费者消费,如果不是同一个group中的消费者可以同时消费。为了实现这个功能kafka需要管理和分配group中的消费分区。这一大块功能属于coordinator。

关键字解释

coordinator

  • Consumer Group用group.id(String)作为全局唯一标识符
  • 每个Group可以有零个、一个或多个Consumer Client
  • 每个Group可以管理零个、一个或多个Topic
  • Group下每个Consumer Client可同时订阅Topic的一个或多个Partition
  • Group下同一个Partition只能被一个Client订阅,多Group下的Client订阅不受影响;因为如果一个Partition有多个Consumer,那么每个Consumer在该Partition上的Offset很可能会不一致,这样会导致在Rebalance后赋值处理的Client的消费起点发生混乱;与此同时,这种场景也不符合Kafka中Partition消息消费的一致性;因此在同一Group下一个Partition只能对应一个Consumer Client。

rebalance

它本质上是一种协议,规定了一个 Consumer Group 下的所有 consumer 如何达成一致,来分配订阅 Topic 的每个分区。例如:某 Group 下有 20 个 consumer 实例,它订阅了一个具有 100 个 partition 的 Topic 。正常情况下,kafka 会为每个 Consumer 平均的分配 5 个分区。这个分配的过程就是 Rebalance。

需要注意的是,rebalance过程中group内的所有consumer会停止消费,非常影响kafka的消费性能,所以需要尽量避免rebalance的发生。

rebalance协议简单示例

源码解析

消费者拉取消息过程

消费者拉取消息是通过KafkaConsumer.poll(),这也是rebalance过程的一个入口。

前面的调用链就简化一下:

KafkaConsumer.poll(final Duration timeout)
-> KafkaConsumer.poll(final Timer timer, final boolean includeMetadataInTimeout)
-> KafkaConsumer.updateAssignmentMetadataIfNeeded
-> coordinator.poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public ConsumerRecords<K, V> poll(final Duration timeout) {
return poll(time.timer(timeout), true);
}
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());

if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
}

// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
// 默认为true
if (includeMetadataInTimeout) {
// 在这里执行coordinator
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
// 在这里执行coordinator
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn("Still waiting for metadata");
}
}

final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());

return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}

在KafkaConsumer.poll中会执行updateAssignmentMetadataIfNeeded方法,在这个方法里执行了coordinator的poll方法。

1
2
3
4
5
6
7
8
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
// 执行poll方法
if (coordinator != null && !coordinator.poll(timer)) {
return false;
}

return updateFetchPositions(timer);
}

ConsumerCoordinator.poll

rebalance的核心是ensureActiveGroup方法,在ensureActiveGroup中会发起JoinGroup请求和SyncGroup请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public boolean poll(Timer timer) {
// 元数据更新
maybeUpdateSubscriptionMetadata();
// 完成已提交offset的回调
invokeCompletedOffsetCommitCallbacks();
// 是自动分配partition模式
if (subscriptions.partitionsAutoAssigned()) {
if (protocol == null) {
throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
" to empty while trying to subscribe for group protocol to auto assign partitions");
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
// 更新心跳
pollHeartbeat(timer.currentTimeMs());
// 找到对应的coordinator,并建立连接
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
// 判断是否有需要rejoin group的情况发生
if (rejoinNeededOrPending()) {
// rejoin group 之前先刷新一下 metadata(对于 AUTO_PATTERN 而言)
if (subscriptions.hasPatternSubscription()) {

if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
this.metadata.requestUpdate();
}

if (!client.ensureFreshMetadata(timer)) {
return false;
}
// 需要是更新元数据
maybeUpdateSubscriptionMetadata();
}
// 发起加入组请求和sync,进行rebalance
// 创建心跳线程
if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}

maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}
invokeCompletedOffsetCommitCallbacks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void invokeCompletedOffsetCommitCallbacks() {
if (asyncCommitFenced.get()) {
throw new FencedInstanceIdException("Get fenced exception for group.instance.id "
+ rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
+ ", current member.id is " + memberId());
}
while (true) {
// 取出完成的commit offect的回调
OffsetCommitCompletion completion = completedOffsetCommits.poll();
if (completion == null) {
break;
}
completion.invoke();
}
}
pollHeartbeat
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected synchronized void pollHeartbeat(long now) {
if (heartbeatThread != null) {
if (heartbeatThread.hasFailed()) {
// set the heartbeat thread to null and raise an exception. If the user catches it,
// the next call to ensureActiveGroup() will spawn a new heartbeat thread.
RuntimeException cause = heartbeatThread.failureCause();
heartbeatThread = null;
throw cause;
}
// Awake the heartbeat thread if needed
if (heartbeat.shouldHeartbeat(now)) {
notify();
}
heartbeat.poll(now);
}
}
确保coordinator服务处于ready状态ensureCoordinatorReady
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
// 已经确定coordinator服务了
if (!coordinatorUnknown())
return true;
// 循环查找
do {
// 发起查找请求
final RequestFuture<Void> future = lookupCoordinator();
// 发送请求
client.poll(future, timer);

if (!future.isDone()) {
// ran out of time
break;
}

if (future.failed()) {
// 查找失败更新元数据
if (future.isRetriable()) {
log.debug("Coordinator discovery failed, refreshing metadata");
client.awaitMetadataUpdate(timer);
} else
throw future.exception();
} else if (coordinator != null && client.isUnavailable(coordinator)) {// 找到但是不可用
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
// 标记为空
markCoordinatorUnknown();
timer.sleep(rebalanceConfig.retryBackoffMs);
}
} while (coordinatorUnknown() && timer.notExpired());

return !coordinatorUnknown();
}

查找Coordinator节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
// 选择负载最小的节点
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
// 发起查找Coordinator节点的请求
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending FindCoordinator request to broker {}", node);
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}

查找Coordinator会先发生一个查找请求到当前负载最小broker节点上,然后broker会计算当前组所属的Coordinator节点是哪个;这个计算逻辑是根据groupId的hash值对__consumer_offset_主题的分区数取余,这个分区所在的broker也就是这个group的Coordinator

Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

这也意味着这个group的所有消息的offset信息都会记录在这个__consumer_offset_主题的这个分区中。

__consumer_offset_主题的默认分区数是50。

needRejoin 是否需要rebalance
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean rejoinNeededOrPending() {
if (!subscriptions.partitionsAutoAssigned())
return false;

// we need to rejoin if we performed the assignment and metadata has changed;
// also for those owned-but-no-longer-existed partitions we should drop them as lost
if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot))
return true;

// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) {
return true;
}

return super.rejoinNeededOrPending();
}

只有auto assign模式的才可能需要,如果订阅的topic list发生变化、订阅的topic的partition发生变化,就需要rejoin,也就是要进行rebalance过程,其实还有个条件会触发,就是当group里有新的consumer加入或者有consumer主动退出或挂掉。

rebalance核心ensureActiveGroup
1
2
3
4
5
6
7
8
9
10
boolean ensureActiveGroup(final Timer timer) {
// 再次确认是否准备好了
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 没有心跳线程则开启
startHeartbeatThreadIfNeeded();
// 加入组
return joinGroupIfNeeded(timer);
}
开启心跳线程startHeartbeatThreadIfNeeded

开启单独的心跳处理线程

1
2
3
4
5
6
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}

加入组joinGroupIfNeeded
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
boolean joinGroupIfNeeded(final Timer timer) {
// 需要重新加入
while (rejoinNeededOrPending()) {
// Coordinator找到且准备好
if (!ensureCoordinatorReady(timer)) {
return false;
}

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
// 避免中途唤醒的情况
if (needsJoinPrepare) {
// need to set the flag before calling onJoinPrepare since the user callback may throw
// exception, in which case upon retry we should not retry onJoinPrepare either.
needsJoinPrepare = false;
// 加入组的前期准备/处理
// 这里会回调rebalance监听器
onJoinPrepare(generation.generationId, generation.memberId);
}
// 发送JoinGroup请求
// 请求的响应会通过JoinGroupResponseHandler来处理
// 在JoinGroupResponseHandler中会根据响应发起SyncGroup请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer);
if (!future.isDone()) {
// we ran out of time
return false;
}
// 加入组成功
if (future.succeeded()) {
Generation generationSnapshot;

// Generation data maybe concurrently cleared by Heartbeat thread.
// Can't use synchronized for {@code onJoinComplete}, because it can be long enough
// and shouldn't block hearbeat thread.
// See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
}
if (generationSnapshot != Generation.NO_GENERATION) {
// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
ByteBuffer memberAssignment = future.value().duplicate();
// 入组成功的回调
// 重新订阅主题
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment);

// Generally speaking we should always resetJoinGroupFuture once the future is done, but here
// we can only reset the join group future after the completion callback returns. This ensures
// that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
// And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
resetStateAndRejoin();
resetJoinGroupFuture();
return false;
}
} else { // 失败了重新加入
resetJoinGroupFuture();
final RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue;
else if (!future.isRetriable())
throw exception;
// 重试延时
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
加入组的前期准备/处理onJoinPrepare

加入组的前期准备/处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
protected void onJoinPrepare(int generation, String memberId) {
log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
// commit offsets prior to rebalance if auto-commit enabled
// 开启自动提交则先提交offset
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));

// the generation / member-id can possibly be reset by the heartbeat thread
// upon getting errors or heartbeat timeouts; in this case whatever is previously
// owned partitions would be lost, we should trigger the callback and cleanup the assignment;
// otherwise we can proceed normally and revoke the partitions depending on the protocol,
// and in that case we should only change the assignment AFTER the revoke callback is triggered
// so that users can still access the previously owned partitions to commit offsets etc.
// 错误或心跳超时时,心跳线程可能会重置generation / member-id
// 这个时候清理订阅的主题
Exception exception = null;
final Set<TopicPartition> revokedPartitions;
if (generation == Generation.NO_GENERATION.generationId &&
memberId.equals(Generation.NO_GENERATION.memberId)) {
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());

if (!revokedPartitions.isEmpty()) {
log.info("Giving away all assigned partitions as lost since generation has been reset," +
"indicating that consumer is no longer part of the group");
exception = invokePartitionsLost(revokedPartitions);
// 清空订阅
subscriptions.assignFromSubscribed(Collections.emptySet());
}
} else {
switch (protocol) {
case EAGER:
// revoke all partitions
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions);

subscriptions.assignFromSubscribed(Collections.emptySet());

break;

case COOPERATIVE:
// only revoke those partitions that are not in the subscription any more.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions = ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet());

if (!revokedPartitions.isEmpty()) {
// 执行rebalance监听器回调
exception = invokePartitionsRevoked(revokedPartitions);

ownedPartitions.removeAll(revokedPartitions);
// 清理订阅
subscriptions.assignFromSubscribed(ownedPartitions);
}

break;
}
}

isLeader = false;
// 更新组信息,groupSubscription = Collections.emptySet()
subscriptions.resetGroupSubscription();

if (exception != null) {
throw new KafkaException("User rebalance callback throws an error", exception);
}
}

加入组响应处理JoinGroupResponseHandler

在JoinGroupResponseHandler响应处理器中,根据响应信息判断当前节点是不是leader节点;如果是leader节点则对组内节点分配分区,发起SyncGroup请求将分区结果带给coordinator节点;如果不是leader节点,则发起SyncGroup请求没有分区分配信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) { // 没有异常
log.debug("Received successful JoinGroup response: {}", joinResponse);
sensors.joinSensor.record(response.requestLatencyMs());

synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// if the consumer was woken up before a rebalance completes, we may have already left
// the group. In this case, we do not want to continue with the sync group.
future.raise(new UnjoinedGroupException());
} else {
// 记录id
AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
// 判断是否leader节点
// 发送SyncGroupRequest
// leader节点会直接分配好分区,将这个分配信息一起传给coordinator节点
if (joinResponse.isLeader()) {
onJoinLeader(joinResponse).chain(future);
} else {
onJoinFollower().chain(future);
}
}
}
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
log.debug("Attempt to join group rejected since coordinator {} is loading the group.", coordinator());
// backoff and retry
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);
log.debug("Attempt to join group failed due to unknown member id.");
future.raise(error);
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message());
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
log.error("Received fatal exception: group.instance.id gets fenced");
future.raise(error);
} else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
|| error == Errors.INVALID_SESSION_TIMEOUT
|| error == Errors.INVALID_GROUP_ID
|| error == Errors.GROUP_AUTHORIZATION_FAILED
|| error == Errors.GROUP_MAX_SIZE_REACHED) {
// log the error and re-throw the exception
log.error("Attempt to join group failed due to fatal error: {}", error.message());
if (error == Errors.GROUP_MAX_SIZE_REACHED) {
future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
" already has the configured maximum number of members."));
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(error);
}
} else if (error == Errors.UNSUPPORTED_VERSION) {
log.error("Attempt to join group failed due to unsupported version error. Please unset field group.instance.id and retry" +
"to see if the problem resolves");
future.raise(error);
} else if (error == Errors.MEMBER_ID_REQUIRED) {
// Broker requires a concrete member id to be allowed to join the group. Update member id
// and send another join group request in next cycle.
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID,
joinResponse.data().memberId(), null);
AbstractCoordinator.this.resetStateAndRejoin();
}
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("Attempt to join group failed due to unexpected error: {}", error.message());
future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}

参考

KAFKA源码走读-COORDINATOR