kafka之七-幂等性

kafka中的exactly once semantics

kafka0.11.0.0版本正式支持精确一次处理语义(exactly once semantics,EOS),EOS主要体现在3个方面:

  • 幂等producer:保证发送单个分区的消息只会发送一次,不会出现重复消息
  • 事务:保证原子性地写入到多个分区,即写入到多个分区的消息要么全成功,要么失败回滚
  • 流处理EOS:流处理本质上可看成是‘读取-处理-写入’的管道。此EOS保证整个过程的操作是原子性。注意,这只适用kafka streams。

上面3种EOS语义有着不同的应用范围,幂等producer只能保证单分区上无重复消息;事务可以保证多分区写入消息的完整性;而流处理EOS保证的是端到端(E2E)消息处理的EOS。不同的配置如下:

  • 启用幂等producer:在producer程序中设置属性enable.idempotence=true,注意不要设置transactional.id,不要设置成null也不要设置成空字符串
  • 启用带伤支持:在producer程序中设置属性transcational.id为一个指定字符串,同时设置enable.idempotence=true
  • 启用流式处理EOS:在kafka streams程序中设置processing.guarantee=exactly_once

接下来聊聊producer幂等性

producer幂等性

producer幂等性指的是当发送同一条消息时,数据在server端只会被持久化一次,数据不丢不重,但是这里的幂等是有条件的:

  • 只能保证producer在单个会话内不丢不重,如果producer出现重启是无法保证幂等性的(在设置了幂等的情况下,是无法获取之前的状态信息的,因此无法做到跨会话级别的不丢不重)
  • 幂等性不能跨多个topic-partition,只能保证单个partition内的幂等性,当涉及多个topic-partition时,这中间的状态并没有同步。

如果需要跨会话、跨多个topic-partition的情况,需要使用kafka的事务性来实现。

幂等示例

Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 all
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord(topic, "test");

幂等实现原理

kafka proudcer在实现幂等时有两个重要机制:

  1. PID(Producer ID),用来标识每个producer client
  2. sequence numbers,client发送的每条消息都会带相应的sn,server端再根据这个值来判断数据是否重复

PID

每个producer在初始化时都会被分配一个唯一的PID,这个PID对用户是透明的,没有暴露给用户。对于一个给定的PID,sequence number将会从0开始自增,每个topic-partition都会有一个独立的sequence number。producer在发送数据时,将会给每条消息标识一个sn,然后server以此来验重。这里的PID是全局唯一的,如果producer重启后会被分配一个新的PID,这也是幂等性无法做到跨会话的一个原因。

PID申请与管理

producer向broker发送一个请求获取PID(server端会选择一台连接数量最少的broker进行处理),broker收到申请PID的请求后,会尝试在ZK创建一个/latest_producer_id_block节点,每个broker向ZK申请一个PID段后,都会将自己申请的PID段信息写入到这个节点,这样当其他broker再申请PID段时,会首先读取这个节点的信息,然后根据block_end选择一个PID段,最后再将信息写回到ZK的这个节点里,这个节点信息内容如下:

{"version":1,"broker":35,"block_start":"4000","block_end":"4999"}

broker与ZK交互流程:

  1. 先从ZK的/latest_producer_id_block节点读取最新已经分配的PID段信息
  2. 如果该节点不存在,直接从0开始分配,选择0-1000的PID段(PidBlockSize默认为1000)
  3. 如果该节点存在,读取其中数据,根据block_end选择这个PID段
  4. 在选择了相应的PID段后,将这个PID段信息写回到ZK的这个节点中,如果写入成功,证明PID段申请成功;如果失败证明此时可能其它的broker已经更新了这个节点,就需要从步骤1重新开始执行

针对第4点再说明下,在回写ZK节点时会判断当前节点的zkVersion是否与第1点里获取的zkVersion相同,如果相同才可以写入成功,不同说明这个节点被修改了则会写入失败。

client幂等时发送流程

java producer(区别于scala producer)是双线程设计,分为用户主线程与sender线程,前者调用send方法将消息写入到producer的内存缓冲区,即RecordAccumulator中,后者会定期从RecordAccumulator中获取消息并将消息归入不同的batch中发送到对应的broker上。在幂等producer中,用户主线程的逻辑变动不大。send方法依然是将消息写入到RecordAccumulator。而Sender线程却有着很大的改支。

  1. 用户线程调用send方法将数据添加到RecordAccumulator中,添加时会判断是否需要新建一个ProducerBatch,这时的ProducerBatch还是没有PID和sequence number
  2. sender线程在执行时,判断当前PID是否需要重置:如果有消息重试多次人失败最后因为超时而被移除,这时的seqeunce number有部分已经分配出去,那这是不允许发送的。
  3. sender线程阻塞获取PID
  4. 在ProducerBatch里设置相应的PID与sequence number,进行发送。

server端处理producer请求

server端会主要检验请求里是否有PID信息,校验batch是否重复:

  1. 如果PID不存在,那么判断sequence number是否从0开始,是的话,在缓存中记录PID的meta信息(PID,epoch,sequence number),并执行写入操作,否则返回UnknowProducerIdException(PID在server端已经过期或这个PID写的数据已经过期,但producer还在接着上次的sn发送数据)
  2. 如果PID存在,先检查PID epoch与server端记录的是否相同
  3. 如果不同,且sn不是从0开始,那么返回OutOfOrderSequenceException异常
  4. 如果相同,那么根据缓存中记录的最近一次sn(currentLastSeq)检查是否为连接,不连接的情况下那么返回OutOfOrderSequenceException异常

最后总结一下幂等消息处理流程:

  1. 每个producer会被分配一个PID,SN
  2. producer与broker端都有<PID,PartitionID>与SN的映射关系
  3. producer每发送一条消息后就将对应的分区序列号加一
  4. broker会比较序列号,如果new SN < old SN+1,说明是过期的,会抛弃这条数据;如果new SN > old SN+1,说明有消息丢失了,抛出异常。

参考文章:
Kafka 事务性之幂等性实现
Kafka幂等性及事务
关于Kafka幂等producer的讨论

作者:步履不停原文地址:https://segmentfault.com/a/1190000042254265

%s 个评论

要回复文章请先登录注册