kafka可以分成3大块,kafka服务器端、生产者、消费者。生产者的逻辑较为简单,主要是对消息发送进行处理减少网络通信次数。

对于生产者我们要关注的就是它的线程模型或者说网络模型。

先展示一个简单的生产者的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MessageProduct {

static String KAFKA_SERVERS = "192.168.56.10:9092";

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
//props.put("metadata.broker.list", KAFKA_SERVERS);
props.put("zk.connect", "192.168.56.10:2181/kafka,192.168.56.10:2182/kafka,192.168.56.10:2183/kafka");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 30; i++) {
producer.send(new ProducerRecord<String, String>("test", 0, Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}

Producer对象发送消息的方法就是send,接下来就开始解析KafkaProducer。

KafkaProducer

构造器

构造器中主要是对参数的初始化,这里只要关注消息发送相关的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public KafkaProducer(final Map<String, Object> configs) {
this(configs, null, null, null, null, null, Time.SYSTEM);
}
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
// ... 省略前面的配置部分

this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}

可以看到消息发送的核心对象Sender,它使用KafkaThread线程对象包裹,在独立线程中运行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
int retries = configureRetries(producerConfig, transactionManager != null, log);
short acks = configureAcks(producerConfig, transactionManager != null, log);
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
retries,
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}

在看Sender构建的过程要关注KafkaClient的实现,默认是NetworkClient,这是生产者网络通信的底层。

发送消息send

1
2
3
4
5
6
7
8
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

send方法执行了生产者拦截器,doSend执行实际发送逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
// 确定topic的元数据可用
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化的处理
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 计算出发送的topic分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
// 回调接口处理
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
// 发送核心:将消息交给消息累加器
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true);
// 需要创建Batch所以没成功
if (result.abortForNewBatch) {
int prevPartition = partition;
// 这里调用了分区器的onNewBatch
// 如果分区器使用了StickyPartitionCache,通常会在这步执行nextPartition进行更新
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
// 重新分区
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
// producer callback will make sure to call both 'callback' and interceptor callback
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
// 重新执行添加
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false);
}

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

doSend的代码很长,实际逻辑是比较清晰明了的:

  1. 确保producer还在运行;
  2. 确定topic的元数据可用(处理/更新元数据);
  3. key和value序列化的处理;
  4. 计算出发送的topic分区;
  5. 回调接口封装;
  6. 将消息交给消息累加器accumulator;
  7. 处理中断情况;

其中两个核心:确定要发送的分区、将消息交给累加器;

确定发放分区partition

1
2
3
4
5
6
7
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

分区确定有两种方式:

  1. 消息本身(ProducerRecord)自带分区信息;
  2. 通过分区器进行分区;

分区器有两种:

  1. 哈希分区;
  2. 轮询分区;
  3. 粘连分区

分区器接口Partitioner

1
2
3
4
5
6
7
8
9
public interface Partitioner extends Configurable, Closeable {

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

public void close();

default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
}
}

哈希分区器DefaultPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class DefaultPartitioner implements Partitioner {

private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

public void configure(Map<String, ?> configs) {}

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 没有key使用粘连分区
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// 获取主题的分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// key做哈希然后根据分区数取余
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

public void close() {}

/**
* If a batch completed for the current sticky partition, change the sticky partition.
* Alternately, if no sticky partition has been determined, set one.
*/
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}
}

轮询分区RoundRobinPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class RoundRobinPartitioner implements Partitioner {
// 每个topic都有一个计数值
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {}

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
// 分区可用
if (!availablePartitions.isEmpty()) {
// 对可用分区取余
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// 没有可用分区,直接对所有分区取余
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}

private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

public void close() {}

}

粘连分区缓存StickyPartitionCache

粘连分区:在一定时间内都向一个分区进行发送。

这个分区更新发生在partitioner.onNewBatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class StickyPartitionCache {
// 主题-分区
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}

public int partition(String topic, Cluster cluster) {
// 最近分配的主题分区对应关系一直缓存
Integer part = indexCache.get(topic);
if (part == null) {
// 没有缓存则执行分配计算
return nextPartition(topic, cluster, -1);
}
return part;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// 没有分配过或者与上次分配的一致
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
// 没有可用分区直接随机选择一个分区
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
// 避免其他线程已经分配
while (newPart == null || newPart.equals(oldPart)) {
// 可用分区中随机一个
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

}

消息累加器 RecordAccumulator

append

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
// 记录在添加中的消息
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// check if we have an in-progress batch
// 获取批量消息队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 尝试添加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}

// we don't have an in-progress record batch try to allocate a new batch
// 创建Batch中断时不做处理
if (abortOnNewBatch) {
// Return a result that will cause another call to append.
return new RecordAppendResult(null, false, false, true);
}

byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 分配缓冲区
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException("Producer closed while send in progress");
// 再次尝试添加
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
// 创建新的ProducerBatch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

累加器添加逻辑:

  1. 消息处理计数器加一;
  2. 根据主题分区获取批量消息队列Deque;
  3. Deque加锁:
    1. 通过tryAppend尝试将消息加入队列尾的批量消息对象中;
    2. 如果加成功则返回;
  4. abortOnNewBatch设置为true,则结束;
  5. 分配一个新的缓冲区;
  6. Deque加锁:
    1. 通过tryAppend尝试将消息加入队列尾的批量消息对象中(避免其它线程已经添加了ProducerBatch);
    2. 如果添加成功则返回;
    3. 创建新的ProducerBatch;
    4. 将消息加入新创建的ProducerBatch;
    5. 将ProducerBatch加入队列尾;
  7. finally:消息处理计数器减一

累加器本质是实现了消息发送的异步(IO解耦),主动发送消息只是将消息存到队列中;实际发送消息是IO线程(Sender)去获取队列中的消息进行发送。

除了消息异步的作用,还有优化了消息发送,一个主题分区有一个单独的队列,队列中存储的是ProducerBatch,ProducerBatch就是字面意思批量的消息。这样IO线程可以不用一次发一条,而是批量发送减少网络传输。

消息存在队列中发送有几种时机:ProducerBatch满了、存在队列中达到最大时间。

获取队列getOrCreateDeque

1
2
3
4
5
6
7
8
9
10
11
12
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
// 根据主题分区查找队列
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
  1. 根据主题分区查找队列;
  2. 如果有队列则返回;
  3. 创建ArrayDeque;
  4. batches中存入队列并返回;

尝试保存消息tryAppend

1
2
3
4
5
6
7
8
9
10
11
12
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false);
}
return null;
}
  1. 获取队尾元素ProducerBatch;
  2. 如果队尾元素不为null:
    1. 调用ProducerBatch的tryAppend插入消息;
    2. 如果返回为null则执行ProducerBatch的closeForRecordAppends()关闭对象流,不然则返回;
  3. 返回null;

ProducerBatch.tryAppend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {    
// 判断空间是否足够
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compressionType(), key, value, headers));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
// we have to keep every future returned to the users in case the batch needs to be
// split to several new batches and resent.
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}
  1. 如果空间不够则返回null;
  2. 通过recordsBuilder添加(写入对象流中);
  3. 构建返回FutureRecordMetadata;
  4. 添加回调对象;
  5. 消息数recordCount加一;
  6. 返回FutureRecordMetadata;

ProducerBatch.closeForRecordAppends

1
2
3
public void closeForRecordAppends() {
recordsBuilder.closeForRecordAppends();
}

org.apache.kafka.common.record.MemoryRecordsBuilder#closeForRecordAppends

1
2
3
4
5
6
7
8
9
10
11
public void closeForRecordAppends() {
if (appendStream != CLOSED_STREAM) {
try {
appendStream.close();
} catch (IOException e) {
throw new KafkaException(e);
} finally {
appendStream = CLOSED_STREAM;
}
}
}

消息发送 Sender

在KafkaProducer的构造器中可以看到初始化了Sender并开启一个线程运行。Sender负责了消息的发送,代表了IO线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors interceptors,
Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer,
valueSerializer));
try {
// ...
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
// ...
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}

Sender实现了Runnable接口,所以直接看它的run()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Sender implements Runnable{}
```


### run
run中核心是不断的执行runOnce()方法,剩下很大一块代码都是关闭时的处理,所以可以暂时忽略。
```java
public void run() {
log.debug("Starting Kafka producer I/O thread.");

// main loop, runs until close is called
while (running) {
try {
// 执行消息发送
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// 后面关闭的时候处理逻辑
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

// okay we stopped accepting requests but there may still be
// requests in the transaction manager, accumulator or waiting for acknowledgment,
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

// Abort the transaction if any commit or abort didn't go through the transaction manager's queue
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}

if (forceClose) {
// We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on
// the futures.
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}

log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

runOnce()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void runOnce() {
// 事务管理,可以暂时忽略
if (transactionManager != null) {
try {
transactionManager.resetProducerIdIfNeeded();

if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (maybeSendAndPollTransactionalRequest()) {
return;
}

// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}

long currentTimeMs = time.milliseconds();
// 更新可以发送的消息
long pollTimeout = sendProducerData(currentTimeMs);
// 发送消息
client.poll(pollTimeout, currentTimeMs);
}
  1. if开始是事务管理,可以先作忽略;
  2. 获取当前时间ms
  3. sendProducerData更新可以发送的消息;
  4. 调用client.poll发送消息。

client的默认实现是NetworkClient。

更新要发送的数据sendProducerData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private long sendProducerData(long now) {
// 获取主题元数据
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
// 获取准备发送的消息
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

// 不知道主题的节点,则强制更新元数据
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);

log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
}

// remove any nodes we aren't ready to send to
// 移除还没有准备好的节点,例如连接还没有好
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
// 节点还没有准备好
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}

// create produce requests
// 创建需要发送的消息组
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 添加flightBatches,飞行中的批消息(还没有收到响应)
// flightBatches可以控制发送的频率
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// 将节点静默一段时间,这段时间会不发送
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
// 设置下一个Batch到期时间
accumulator.resetNextBatchExpiryTime();
// 获取发送超时的批消息
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
// 获取到期的批消息
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

// 记录过期的批消息
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
}
}
sensors.updateProduceRequestMetrics(batches);

// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
// time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
// sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
// that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
pollTimeout = 0;
}
sendProduceRequests(batches, now);
return pollTimeout;
}
  1. 获取主题元数据;
  2. 通过累加器获取准备发送的消息;
  3. 将消息加到flightBatches,飞行中的批消息(还没有收到响应);
  4. guaranteeMessageOrder为true,则将节点静默一段时间,这段时间会不发送;
  5. 设置下一个Batch到期时间;
  6. 获取发送超时的批消息(flightBatches中的消息);
  7. 获取到期的批消息(通过累加器获取);
  8. 对过期消息进行处理执行failBatch并处理事务;
  9. 数据和时间统计;
  10. 调用sendProduceRequests方法发送;

sendProduceRequests方法核心是对ProducerBatch进行封装,然后调用NetworkClient的send方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
// 按主题分区聚合
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

// find the minimum magic version used when creating the record sets
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}

for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();

// down convert if necessary to the minimum magic used. In general, there can be a delay between the time
// that the producer starts building the batch and the time that we send the request, and we may have
// chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
// the new message format, but found that the broker didn't support it, so we need to down-convert on the
// client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
// not all support the same message format version. For example, if a partition migrates from a broker
// which is supporting the new magic version to one which doesn't, then we will need to convert.
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
produceRecordsByPartition.put(tp, records);
recordsByPartition.put(tp, batch);
}

String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 节点id
String nodeId = Integer.toString(destination);
// 封装请求
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
// 调用client发送请求
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}

NetworkClient

send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public void send(ClientRequest request, long now) {
doSend(request, false, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
// 获取节点id
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);

if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = clientRequest.apiKey().latestVersion();
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
clientRequest.correlationId(), destination);
} else {
log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), destination);
}
}
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
}

Selector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
this.failedSends.add(connectionId);
} else {
try {
channel.setSend(send);
} catch (Exception e) {
// update the state for consistency, the channel will be discarded after `close`
channel.state(ChannelState.FAILED_SEND);
// ensure notification via `disconnected` when `failedSends` are processed in the next poll
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
throw e;
}
}
}
}

KafkaChannel

setSend

1
2
3
4
5
6
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

write()

setSend只是设置要发送的数据,实际发送是在write()

1
2
3
4
5
6
7
8
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}

write() 被调用的逻辑:

Sender#runOnce
NetworkClient.poll
Selector#poll
Selector#pollSelectionKeys
KafkaChannel#write()

可以回到Sender#runOnce)

也就是setSend将数据准备,然后执行write()