public Future<RecordMetadata> send(ProducerRecord<K, V> record){ return send(record, null); } public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback){ // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // 回调接口处理 // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } // 发送核心:将消息交给消息累加器 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true); // 需要创建Batch所以没成功 if (result.abortForNewBatch) { int prevPartition = partition; // 这里调用了分区器的onNewBatch // 如果分区器使用了StickyPartitionCache,通常会在这步执行nextPartition进行更新 partitioner.onNewBatch(record.topic(), cluster, prevPartition); // 重新分区 partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); // 重新执行添加 result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); returnnew FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); thrownew InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
publicvoidclose(){} /** * If a batch completed for the current sticky partition, change the sticky partition. * Alternately, if no sticky partition has been determined, set one. */ publicvoidonNewBatch(String topic, Cluster cluster, int prevPartition){ stickyPartitionCache.nextPartition(topic, cluster, prevPartition); } }
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch)throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). // 记录在添加中的消息 appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch // 获取批量消息队列 Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) thrownew KafkaException("Producer closed while send in progress"); // 尝试添加 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; }
// we don't have an in-progress record batch try to allocate a new batch // 创建Batch中断时不做处理 if (abortOnNewBatch) { // Return a result that will cause another call to append. returnnew RecordAppendResult(null, false, false, true); } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); // 分配缓冲区 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) thrownew KafkaException("Producer closed while send in progress"); // 再次尝试添加 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } // 创建新的ProducerBatch MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch); incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; returnnew RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now){ // 判断空间是否足够 if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { returnnull; } else { Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
### run run中核心是不断的执行runOnce()方法,剩下很大一块代码都是关闭时的处理,所以可以暂时忽略。 ```java publicvoidrun(){ log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called while (running) { try { // 执行消息发送 runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // 后面关闭的时候处理逻辑 log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
// okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } }
// Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } }
if (forceClose) { // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); }
log.debug("Shutdown of Kafka producer I/O thread has completed."); }
if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } elseif (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError( new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } elseif (maybeSendAndPollTransactionalRequest()) { return; }
// do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } elseif (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } }
long currentTimeMs = time.milliseconds(); // 更新可以发送的消息 long pollTimeout = sendProducerData(currentTimeMs); // 发送消息 client.poll(pollTimeout, currentTimeMs); }
privatelongsendProducerData(long now){ // 获取主题元数据 Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send // 获取准备发送的消息 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 不知道主题的节点,则强制更新元数据 if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
// remove any nodes we aren't ready to send to // 移除还没有准备好的节点,例如连接还没有好 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); // 节点还没有准备好 if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests // 创建需要发送的消息组 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // 添加flightBatches,飞行中的批消息(还没有收到响应) // flightBatches可以控制发送的频率 addToInflightBatches(batches); if (guaranteeMessageOrder) { // 将节点静默一段时间,这段时间会不发送 // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 设置下一个Batch到期时间 accumulator.resetNextBatchExpiryTime(); // 获取发送超时的批消息 List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); // 获取到期的批消息 List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches);
// 记录过期的批消息 if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } } sensors.updateProduceRequestMetrics(batches);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data // that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout = Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; pollTimeout = 0; } sendProduceRequests(batches, now); return pollTimeout; }
privatevoidsendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now){ for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); } privatevoidsendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches){ if (batches.isEmpty()) return; // 按主题分区聚合 Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
// find the minimum magic version used when creating the record sets byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); }
for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records();
// down convert if necessary to the minimum magic used. In general, there can be a delay between the time // that the producer starts building the batch and the time that we send the request, and we may have // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use // the new message format, but found that the broker didn't support it, so we need to down-convert on the // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); }
@Override publicvoidsend(ClientRequest request, long now){ doSend(request, false, now); } privatevoiddoSend(ClientRequest clientRequest, boolean isInternalRequest, long now){ ensureActive(); // 获取节点id String nodeId = clientRequest.destination(); if (!isInternalRequest) { // If this request came from outside the NetworkClient, validate // that we can send data. If the request is internal, we trust // that internal code has done this validation. Validation // will be slightly different for some internal requests (for // example, ApiVersionsRequests can be sent prior to being in // READY state.) if (!canSendRequest(nodeId, now)) thrownew IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); } AbstractRequest.Builder<?> builder = clientRequest.requestBuilder(); try { NodeApiVersions versionInfo = apiVersions.get(nodeId); short version; // Note: if versionInfo is null, we have no server version information. This would be // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { version = builder.latestAllowedVersion(); if (discoverBrokerVersions && log.isTraceEnabled()) log.trace("No version information found when sending {} with correlation id {} to node {}. " + "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version); } else { version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion()); } // The call to build may also throw UnsupportedVersionException, if there are essential // fields that cannot be represented in the chosen version. doSend(clientRequest, isInternalRequest, now, builder.build(version)); } catch (UnsupportedVersionException unsupportedVersionException) { // If the version is not supported, skip sending the request over the wire. // Instead, simply add it to the local queue of aborted requests. log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder, clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException); ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, unsupportedVersionException, null, null);
publicvoidsend(Send send){ String connectionId = send.destination(); KafkaChannel channel = openOrClosingChannelOrFail(connectionId); if (closingChannels.containsKey(connectionId)) { // ensure notification via `disconnected`, leave channel in the state in which closing was triggered this.failedSends.add(connectionId); } else { try { channel.setSend(send); } catch (Exception e) { // update the state for consistency, the channel will be discarded after `close` channel.state(ChannelState.FAILED_SEND); // ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); close(channel, CloseMode.DISCARD_NO_NOTIFY); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); throw e; } } } }
KafkaChannel
setSend
1 2 3 4 5 6
publicvoidsetSend(Send send){ if (this.send != null) thrownew IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); }
write()
setSend只是设置要发送的数据,实际发送是在write()
1 2 3 4 5 6 7 8
public Send write()throws IOException { Send result = null; if (send != null && send(send)) { result = send; send = null; } return result; }