配置

Broker Config

broker端可以什么都不配置即可使用。

  1. ransactional.id.timeout.ms:

在ms中,事务协调器在生产者TransactionalId提前过期之前等待的最长时间,并且没有从该生产者TransactionalId接收到任何事务状态更新。默认是604800000(7天)。这允许每周一次的生产者作业维护它们的id

  1. max.transaction.timeout.ms

事务允许的最大超时。如果客户端请求的事务时间超过此时间,broke将在InitPidRequest中返回InvalidTransactionTimeout错误。这可以防止客户机超时过大,从而导致用户无法从事务中包含的主题读取内容。
默认值为900000(15分钟)。这是消息事务需要发送的时间的保守上限。

  1. transaction.state.log.replication.factor

事务状态topic的副本数量。默认值:3

  1. transaction.state.log.num.partitions

事务状态主题的分区数。默认值:50

  1. transaction.state.log.min.isr

事务状态主题的每个分区ISR最小数量。默认值:2

  1. transaction.state.log.segment.bytes

事务状态主题的segment大小。默认值:104857600字节

Producer Config

  1. enable.idempotence:

开启幂等

  1. transaction.timeout.ms

事务超时时间
事务协调器在主动中止正在进行的事务之前等待生产者更新事务状态的最长时间。
这个配置值将与InitPidRequest一起发送到事务协调器。如果该值大于max.transaction.timeout。在broke中设置ms时,请求将失败,并出现InvalidTransactionTimeout错误。
默认是60000。这使得交易不会阻塞下游消费超过一分钟,这在实时应用程序中通常是允许的。

  1. transactional.id:

用于事务性交付的TransactionalId。这支持跨多个生产者会话的可靠性语义,因为它允许客户端确保使用相同TransactionalId的事务在启动任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等交付。

Consumer Config

  1. isolation.level
  • read_uncommitted:以偏移顺序使用已提交和未提交的消息。
  • read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。

例子

这个例子的核心是观察事务开启对其它没有开启事务消息的影响,即消费者消费的顺序。

Producer

一个生产者开启事务发消息,另一个生产者不开启事务写消息,可以观察消费者获取这些消息的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class MessageTransactionProduct {

static String KAFKA_SERVERS = "192.168.56.10:9092";

public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
//props.put("metadata.broker.list", KAFKA_SERVERS);
props.put("zk.connect", "192.168.56.10:2181/kafka,192.168.56.10:2182/kafka,192.168.56.10:2183/kafka");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", true);
props.put("transactional.id", "1");
props.put("transaction.timeout.ms", "10000");


Producer<String, String> producer = new KafkaProducer<>(props);
Producer<String, String> producer2 = init(2);
//producer.send(new ProducerRecord<String, String>("test", 0, Integer.toString(-1), Integer.toString(-1)));
producer.initTransactions();
producer.beginTransaction();
System.out.println(System.currentTimeMillis());
try {
for (int i = 121; i < 130; i++) {
producer.send(new ProducerRecord<String, String>("test", 0, Integer.toString(i), Integer.toString(i)));
}
Thread.sleep(2000);
System.out.println(System.currentTimeMillis());
// 模拟另一个生产者写入消息
producer2.send(new ProducerRecord<String, String>("test", 0, Integer.toString(-2), Integer.toString(-2)));
Thread.sleep(2000);
System.out.println(System.currentTimeMillis());
for (int i = 131; i < 160; i++) {
producer.send(new ProducerRecord<String, String>("test", 0, Integer.toString(i), Integer.toString(i)));
}
}finally {
producer.commitTransaction();
producer.close();
}
}

private static Producer<String, String> init(int id){
Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_SERVERS);
//props.put("metadata.broker.list", KAFKA_SERVERS);
props.put("zk.connect", "192.168.56.10:2181/kafka,192.168.56.10:2182/kafka,192.168.56.10:2183/kafka");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", false);
//props.put("transactional.id", id+"");
props.put("transaction.timeout.ms", "10000");

return new KafkaProducer<>(props);
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class MessageConsumer {

public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", MessageProduct.KAFKA_SERVERS);
props.put("client.id", "1");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("isolation.level", "read_committed");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(System.currentTimeMillis()+" offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}

观察消费者的消费结果可以看到:

  1. 没有事务的消息如果在事务消息开始之前发送:消费者能先消费到没有事务的消息;
  2. 没有事务的消息如果在事务消息开始之后发送:消费者要等到事务完成/中断之后才能消费到没有开启事务的消息。

Producer事务原理

官方的Data Flow


事务和消费者组有些类似,有一个Broker作为事务协调器(Transaction Coordinator)。生产者对应的事务协调器是根据transaction_state主题的分区确定,生产者的TransactionalId的哈希值对transaction_state主题分区数取余对应的分区即为对应的事务协调器节点。

事务流程

部分修改的Data Flow

####
安装事务的流程画了一幅序列图,更好的展示请求的流程。
kafka事务生产者.png

事务初始化initTransactions()

  1. 查找事务协调器节点

Transaction Producer 会向 Broker (随机选择一台 broker,一般选择本地连接最少的这台 broker)发送 FindCoordinatorRequest 请求,获取其 TransactionCoordinator。

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

  1. 获取PID

Producer ID
Transaction Producer 在 initializeTransactions() 方法中会向 TransactionCoordinator 发送 InitPidRequest 请求获取其分配的 PID,有了 PID,事务写入时可以保证幂等性,PID 如何分配可以参考 PID 分配,但是 TransactionCoordinator 在给事务 Producer 分配 PID 会做一些判断,主要的内容是:

  1. 如果这个 txn.id 之前没有相应的事务状态(new txn.id),那么会初始化其事务 meta 信息 TransactionMetadata(会给其分配一个 PID,初始的 epoch 为-1),如果有事务状态,获取之前的状态;
  2. 校验其 TransactionMetadata 的状态信息(参考下面代码中 prepareInitProduceIdTransit() 方法):
    1. 如果前面还有状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常;
    2. 如果此时的状态为 PrepareAbort 或 PrepareCommit,返回 CONCURRENT_TRANSACTIONS 异常;
    3. 如果之前的状态为 CompleteAbort、CompleteCommit 或 Empty,那么先将状态转移为 Empty,然后更新一下 epoch 值;
    4. 如果之前的状态为 Ongoing,状态会转移成 PrepareEpochFence,然后再 abort 当前的事务,并向 client 返回 CONCURRENT_TRANSACTIONS 异常;
    5. 如果状态为 Dead 或 PrepareEpochFence,直接抛出相应的 FATAL 异常;
  3. 将 txn.id 与相应的 TransactionMetadata 持久化到事务日志中,对于 new txn.id,这个持久化的数据主要时 txn.id 与 pid 关系信息,如图中的 3a 所示。

开始事务beginTransaction()

前面两步都是 Transaction Producer 调用 initTransactions() 部分,到这里,Producer 可以调用 beginTransaction() 开始一个事务操作,其实现方法如下面所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//KafkaProducer
//note: 应该在一个事务操作之前进行调用
public void beginTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
transactionManager.beginTransaction();
}

// TransactionManager
//note: 在一个事务开始之前进行调用,这里实际上只是转换了状态(只在 producer 本地记录了状态的开始)
public synchronized void beginTransaction() {
ensureTransactional();
maybeFailWithError();
transitionTo(State.IN_TRANSACTION);
}

这里只是将本地事务状态转移成 IN_TRANSACTION,并没有与 Server 端进行交互,所以在流程图中没有体现出来(TransactionManager 初始化时,其状态为 UNINITIALIZED,Producer 调用 initializeTransactions() 方法,其状态转移成 INITIALIZING)。

事务过程

这个过程中就是正常的发送消息即可。

如果有比较典型的场景:先通过Consumer获取消息,然后处理数据然后通过Producer写入Topic中;这就是典型的Consume-Porcess-Produce Loop场景,对于这种场景有特殊的API: sendOffsetsToTransaction

1
2
3
4
5
6
7
8
9
10
11
12
13
while (true) {
ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
producer.beginTransaction();
//start
for (ConsumerRecord record : records){
producer.send(producerRecord(“outputTopic1”, record));
producer.send(producerRecord(“outputTopic2”, record));
}
// 这里写消费位移
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
//end
producer.commitTransaction();
}

对于事务过程/流程解析一下,序号按照上图。

4.1 AddPartitionsToTxnRequest

Producer 在调用 send() 方法时,Producer 会将这个对应的 Topic—Partition 添加到 TransactionManager 的记录中,如下所示:

1
2
3
//note: 如何开启了幂等性或事务性,需要做一些处理
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

如果这个 Topic-Partition 之前不存在,那么就添加到 newPartitionsInTransaction 集合中,如下所示:

1
2
3
4
5
6
7
8
9
10
11
//note: 将 tp 添加到 newPartitionsInTransaction 中,记录当前进行事务操作的 tp
public synchronized void maybeAddPartitionToTransaction(TopicPartition topicPartition) {
failIfNotReadyForSend();

//note: 如果 partition 已经添加到 partitionsInTransaction、pendingPartitionsInTransaction、newPartitionsInTransaction中
if (isPartitionAdded(topicPartition) || isPartitionPendingAdd(topicPartition))
return;

log.debug("Begin adding new partition {} to transaction", topicPartition);
newPartitionsInTransaction.add(topicPartition);
}

Producer 端的 Sender 线程会将这个信息通过 AddPartitionsToTxnRequest 请求发送给 TransactionCoordinator,也就是图中的 4.1 过程,TransactionCoordinator 会将这个 Topic-Partition 列表更新到 txn.id 对应的 TransactionMetadata 中,并且会持久化到事务日志中,也就是图中的 4.1 a 部分,这里持久化的数据主要是 txn.id 与其涉及到的 Topic-Partition 信息。

4.2 ProduceRequest

这一步与正常 Producer 写入基本上一样,就是相应的 Leader 在持久化数据时会在头信息中标识这条数据是不是来自事务 Producer 的写入(主要是数据协议有变动,Server 处理并不需要做额外的处理)。

4.3 AddOffsetsToTxnRequest

Producer 在调用 sendOffsetsToTransaction() 方法时,第一步会首先向 TransactionCoordinator 发送相应的 AddOffsetsToTxnRequest 请求,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//class KafkaProcducer
//note: 当你需要 batch 的消费-处理-写入消息,这个方法需要被使用
//note: 发送指定的 offset 给 group coordinator,用来标记这些 offset 是作为当前事务的一部分,只有这次事务成功时
//note: 这些 offset 才会被认为 commit 了
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
sender.wakeup();
result.await();
}


// class TransactionManager
//note: 发送 AddOffsetsToTxRequest
public synchronized TransactionalRequestResult sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) {
ensureTransactional();
maybeFailWithError();
if (currentState != State.IN_TRANSACTION)
throw new KafkaException("Cannot send offsets to transaction either because the producer is not in an " +
"active transaction");

log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, consumerGroupId);
AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId,
producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, consumerGroupId);
AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets);
enqueueRequest(handler);
return handler.result;
}

TransactionCoordinator 在收到这个请求时,处理方法与 4.1 中的一样,把这个 group.id 对应的 __consumer_offsets 的 Partition (与写入涉及的 Topic-Partition 一样)保存到事务对应的 meta 中,之后会持久化相应的事务日志,如图中 4.3a 所示。

4.4 TxnOffsetsCommitRequest

Producer 在收到 TransactionCoordinator 关于 AddOffsetsToTxnRequest 请求的结果后,后再次发送 TxnOffsetsCommitRequest 请求给对应的 GroupCoordinator,AddOffsetsToTxnHandler 的 handleResponse() 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
Errors error = addOffsetsToTxnResponse.error();

if (error == Errors.NONE) {
log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId());

// note the result is not completed until the TxnOffsetCommit returns
//note: AddOffsetsToTnxRequest 之后,还会再发送 TxnOffsetCommitRequest
pendingRequests.add(txnOffsetCommitHandler(result, offsets, builder.consumerGroupId()));
transactionStarted = true;
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) {
lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
reenqueue();
} else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.CONCURRENT_TRANSACTIONS) {
reenqueue();
} else if (error == Errors.INVALID_PRODUCER_EPOCH) {
fatalError(error.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(new GroupAuthorizationException(builder.consumerGroupId()));
} else {
fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message()));
}
}

GroupCoordinator 在收到相应的请求后,会将 offset 信息持久化到 consumer offsets log 中(包含对应的 PID 信息),但是不会更新到缓存中,除非这个事务 commit 了,这样的话就可以保证这个 offset 信息对 consumer 是不可见的(没有更新到缓存中的数据是不可见的,通过接口是获取的,这是 GroupCoordinator 本身来保证的)。

事务提交/中断commitTransaction()/abortTransaction()

在一个事务操作处理完成之后,Producer 需要调用 commitTransaction() 或者 abortTransaction() 方法来 commit 或者 abort 这个事务操作。

5.1. EndTxnRequest

无论是 Commit 还是 Abort,对于 Producer 而言,都是向 TransactionCoordinator 发送 EndTxnRequest 请求,这个请求的内容里会标识是 commit 操作还是 abort 操作,Producer 的 commitTransaction()方法实现如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//class KafkaProducer
//note: commit 正在进行的事务操作,这个方法在真正发送 commit 之后将会 flush 所有未发送的数据
//note: 如果在发送中遇到任何一个不能修复的错误,这个方法抛出异常,事务也不会被提交,所有 send 必须成功,这个事务才能 commit 成功
public void commitTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginCommit();
sender.wakeup();
result.await();
}

// class TransactionManager
//note: 开始 commit,转移本地本地保存的状态以及发送相应的请求
public synchronized TransactionalRequestResult beginCommit() {
ensureTransactional();
maybeFailWithError();
transitionTo(State.COMMITTING_TRANSACTION);
return beginCompletingTransaction(TransactionResult.COMMIT);
}

Producer 的 abortTransaction() 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//class KafkaProducer
//note: 取消正在进行事务,任何没有 flush 的数据都会被丢弃
public void abortTransaction() throws ProducerFencedException {
throwIfNoTransactionManager();
TransactionalRequestResult result = transactionManager.beginAbort();
sender.wakeup();
result.await();
}

// class TransactionManager
public synchronized TransactionalRequestResult beginAbort() {
ensureTransactional();
if (currentState != State.ABORTABLE_ERROR)
maybeFailWithError();
transitionTo(State.ABORTING_TRANSACTION);

// We're aborting the transaction, so there should be no need to add new partitions
newPartitionsInTransaction.clear();
return beginCompletingTransaction(TransactionResult.ABORT);
}

它们最终都是调用了 TransactionManager 的 beginCompletingTransaction() 方法,这个方法会向其 待发送请求列表 中添加 EndTxnRequest 请求,其实现如下:

1
2
3
4
5
6
7
8
9
10
//note: 发送 EndTxnRequest 请求,添加到 pending 队列中
private TransactionalRequestResult beginCompletingTransaction(TransactionResult transactionResult) {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
EndTxnRequest.Builder builder = new EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId,
producerIdAndEpoch.epoch, transactionResult);
EndTxnHandler handler = new EndTxnHandler(builder);
enqueueRequest(handler);
return handler.result;
}

TransactionCoordinator 在收到 EndTxnRequest 请求后,会做以下处理:

  1. 更新事务的 meta 信息,状态转移成 PREPARE_COMMIT 或 PREPARE_ABORT,并将事务状态信息持久化到事务日志中;
  2. 根据事务 meta 信息,向其涉及到的所有 Topic-Partition 的 leader 发送 Transaction Marker 信息(也就是 WriteTxnMarkerRquest 请求,见下面的 5.2 分析);
  3. 最后将事务状态更新为 COMMIT 或者 ABORT,并将事务的 meta 持久化到事务日志中,也就是 5.3 步骤。
5.2. WriteTxnMarkerRquest

WriteTxnMarkerRquest 是 TransactionCoordinator 收到 Producer 的 EndTxnRequest 请求后向其他 Broker 发送的请求,主要是告诉它们事务已经完成。不论是普通的 Topic-Partition 还是 __consumer_offsets,在收到这个请求后,都会把事务结果(Transaction Marker 的格数据式见前面)持久化到对应的日志文件中,这样下游 Consumer 在消费这个数据时,就知道这个事务是 commit 还是 abort。

5.3. Writing the Final Commit or Abort Message

当这个事务涉及到所有 Topic-Partition 都已经把这个 marker 信息持久化到日志文件之后,TransactionCoordinator 会将这个事务的状态置为 COMMIT 或 ABORT,并持久化到事务日志文件中,到这里,这个事务操作就算真正完成了,TransactionCoordinator 缓存的很多关于这个事务的数据可以被清除了。

消费者

消费者部分要解决思考问题:

  1. 如何避免消费者读到未提交的消息;
  2. 在事务消息中间(未提交/中断)的非事务消息消费者能否消费到;
  3. 如何保证有未完成事务消息和非事务消息夹杂的顺序消费;
  4. 如何在哪里过滤中断的事务;

消费者有一个 isolation.level 配置,有两个消费策略可选:

  • read_uncommitted:以偏移顺序使用已提交和未提交的消息。
  • read_committed:仅以偏移量顺序使用非事务性消息或已提交事务性消息。为了维护偏移排序,这个设置意味着我们必须在使用者中缓冲消息,直到看到给定事务中的所有消息。

对于read_uncommitted策略有没有事务完全没有影响一样消费;对于read_committed策略,则需要避免消费到。

Last Stable Offset(LSO)

为了解决上面的一系列问题,kafka定义了一个新的offset 概念。

The LSO is defined as the latest offset such that the status of all transactional messages at lower offsets have been determined (i.e. committed or aborted).

对于一个 Partition 而言,offset 小于 LSO 的数据,全都是已经确定的数据,这个主要是对于事务操作而言,在这个 offset 之前的事务操作都是已经完成的事务(已经 commit 或 abort),如果这个 Partition 没有涉及到事务数据,那么 LSO 就是其 HW(水位)。
**
Server 处理 read_committed 类型的 Fetch 请求;

如果 Consumer 的消费策略设置的是 read_committed,其在向 Server 发送 Fetch 请求时,Server 端只会返回 LSO 之前的数据,在 LSO 之后的数据不会返回。

通过LSO就能够解决上面的1、2、3问题,那么LSO有没有弊端?
当然是有的,那就是 long transaction,比如其 first offset 是 1000,另外有几个已经完成的小事务操作,比如:txn1(offset:1100~1200)、txn2(offset:1400~1500),假设此时的 LSO 是 1000,也就是说这个 long transaction 还没有完成,那么已经完成的 txn1、txn2 也会对 consumer 不可见(假设都是 commit 操作),此时受 long transaction 的影响可能会导致数据有延迟

消费过滤

  1. 问题4:在哪里过滤中断的事务

kafka是在消费者客户端进行过滤的,也就是abort的消息消费者也会获取.
**

  1. 如果拉取到的数据只有事务的一部分后面的marker数据还未获取,怎么处理

消费者拉取到的这批数据并不能保证都是完整的事务数据,很有可能是拉取到一个事务的部分数据(marker 数据还没有拉取到),这时候应该怎么办?难道 Consumer 先把这部分数据缓存下来,等后面的 marker 数据到来时再确认数据应该不应该丢弃?(还是又 OOM 的风险)有没有更好的实现方案?

Kafka 的设计总是不会让我们失望,这部分做的优化也是非常高明,Broker 会追踪每个 Partition 涉及到的 abort transactions,Partition 的每个 log segment 都会有一个单独只写的文件(append-only file)来存储 abort transaction 信息,因为 abort transaction 并不是很多,所以这个开销是可以可以接受的,之所以要持久化到磁盘,主要是为了故障后快速恢复,要不然 Broker 需要把这个 Partition 的所有数据都读一遍,才能直到哪些事务是 abort 的,这样的话,开销太大(如果这个 Partition 没有事务操作,就不会生成这个文件)。这个持久化的文件是以 .txnindex 做后缀,前面依然是这个 log segment 的 offset 信息,存储的数据格式如下:

1
2
3
4
5
6
TransactionEntry =>
Version => int16
PID => int64
FirstOffset => int64
LastOffset => int64
LastStableOffset => int64

有了这个设计,Consumer 在拉取数据时,Broker 会把这批数据涉及到的所有 abort transaction 信息都返回给 Consumer,Server 端会根据拉取的 offset 范围与 abort transaction 的 offset 做对比,返回涉及到的 abort transaction 集合,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
val abortedTransactions = ListBuffer.empty[AbortedTxn]
for ((abortedTxn, _) <- iterator()) {
if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
abortedTransactions += abortedTxn //note: 这个 abort 的事务有在在这个范围内,就返回

if (abortedTxn.lastStableOffset >= upperBoundOffset)
return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
}
TxnIndexSearchResult(abortedTransactions.toList, isComplete = false)
}

Consumer 在拿到这些数据之后,会进行相应的过滤,大概的判断逻辑如下(Server 端返回的 abort transaction 列表就保存在 abortedTransactions 集合中,abortedProducerIds 最开始时是为空的):

  1. 如果这个数据是 control msg(也即是 marker 数据),是 ABORT 的话,那么与这个事务相关的 PID 信息从 abortedProducerIds 集合删掉,是 COMMIT 的话,就忽略(每个这个 PID 对应的 marker 数据收到之后,就从 abortedProducerIds 中清除这个 PID 信息,清除的意思是这个事务你都解析到事务的结束标记了,后面不会再有该事务了);
  2. 如果这个数据是正常的数据,把它的 PID 和 offset 信息与 abortedTransactions 队列(有序队列,头部 transaction 的 first offset 最小)第一个 transaction 做比较,如果 PID 相同,并且 offset 大于等于这个 transaction 的 first offset,就将这个 PID 信息添加到 abortedProducerIds 集合中,同时从 abortedTransactions 队列中删除这个 transaction,最后再丢掉这个 batch(它是 abort transaction 的数据);
  3. 检查这个 batch 的 PID 是否在 abortedProducerIds 集合中,在的话,就丢弃,不在的话就返回上层应用。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    private class CompletedFetch {
    private final Iterator<? extends RecordBatch> batches;
    private final Set<Long> abortedProducerIds;
    private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    // ...省略大部分代码

    private Record nextFetchedRecord() {
    while (true) {
    if (records == null || !records.hasNext()) {
    maybeCloseRecordStream();

    if (!batches.hasNext()) {
    // Message format v2 preserves the last offset in a batch even if the last record is removed
    // through compaction. By using the next offset computed from the last offset in the batch,
    // we ensure that the offset of the next fetch will point to the next batch, which avoids
    // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
    // fetching the same batch repeatedly).
    if (currentBatch != null)
    nextFetchOffset = currentBatch.nextOffset();
    drain();
    return null;
    }
    // 迭代
    currentBatch = batches.next();
    lastEpoch = currentBatch.partitionLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
    Optional.empty() : Optional.of(currentBatch.partitionLeaderEpoch());

    maybeEnsureValid(currentBatch);
    // 事务级别是READ_COMMITTED,而且现在这个batch是有PID(可能有事务)
    if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
    // remove from the aborted transaction queue all aborted transactions which have begun
    // before the current batch's last offset and add the associated producerIds to the
    // aborted producer set
    // 更新小于等于offset的abortedTransaction的pid到abortedProducerIds
    consumeAbortedTransactionsUpTo(currentBatch.lastOffset());

    long producerId = currentBatch.producerId();
    // 判断batch是不是Abort标记消息
    if (containsAbortMarker(currentBatch)) {
    // 将该PID移除,因为已经解析到Abort标记消息了,说明该PID的事务消息已经结束(解析到事务结尾了)
    abortedProducerIds.remove(producerId);
    // 判断currentBatch是不是已经abort了,如果是则忽略该消息
    // 这种情况是解析到事务的中间的消息,这个时候通过abortedProducerIds集合进行判断
    } else if (isBatchAborted(currentBatch)) {
    log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
    "offsets {} to {}",
    partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
    nextFetchOffset = currentBatch.nextOffset();
    continue;
    }
    }

    records = currentBatch.streamingIterator(decompressionBufferSupplier);
    } else {
    Record record = records.next();
    // skip any records out of range
    if (record.offset() >= nextFetchOffset) {
    // we only do validation when the message should not be skipped.
    maybeEnsureValid(record);

    // control records are not returned to the user
    if (!currentBatch.isControlBatch()) {
    return record;
    } else {
    // Increment the next fetch offset when we skip a control batch.
    nextFetchOffset = record.offset() + 1;
    }
    }
    }
    }
    }
    // 更新小于等于offset的abortedTransaction的pid到abortedProducerIds
    private void consumeAbortedTransactionsUpTo(long offset) {
    if (abortedTransactions == null)
    return;

    while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset <= offset) {
    FetchResponse.AbortedTransaction abortedTransaction = abortedTransactions.poll();
    abortedProducerIds.add(abortedTransaction.producerId);
    }
    }
    // 是否是abort
    private boolean isBatchAborted(RecordBatch batch) {
    return batch.isTransactional() && abortedProducerIds.contains(batch.producerId());
    }
    // 判断batch是不是Abort标记消息
    private boolean containsAbortMarker(RecordBatch batch) {
    if (!batch.isControlBatch())
    return false;

    Iterator<Record> batchIterator = batch.iterator();
    if (!batchIterator.hasNext())
    return false;

    Record firstRecord = batchIterator.next();
    return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
    }
    }

过滤流程可以参考https://zhmin.github.io/2019/04/20/kafka-consumer-transaction/,实际消费者的代码已经有所不同,我改为2.4.1版本的实现。

参考

本文很大篇幅都是以下文章中的内容,想要更深入理解可以参考一下文章。

kafka系列九、kafka事务原理、事务API和使用场景
Kafka Exactly-Once Data Flow
kafka事务性实现
Kafka Consumer 读取事务消息