在kafka中ISR是什么?
在zk中会保存AR(Assigned Replicas)列表,其中包含了分区所有的副本,其中 AR = ISR+OSR
ISR(in sync replica):是kafka动态维护的一组同步副本,在ISR中有成员存活时,只有这个组的成员才可以成为leader,内部保存的为每次提交信息时必须同步的副本(acks = all时),每当leader挂掉时,在ISR集合中选举出一个follower作为leader提供服务,当ISR中的副本被认为坏掉的时候,会被踢出ISR,当重新跟上leader的消息数据时,重新进入ISR。
OSR(out sync replica): 保存的副本不必保证必须同步完成才进行确认,OSR内的副本是否同步了leader的数据,不影响数据的提交,OSR内的follower尽力的去同步leader,可能数据版本会落后。
Kafka中的消息以一下方式存储到文件中。
HW是HighWatermark的缩写,俗称高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
LEO:LogEndOffset的缩写,表示每个partition的log最后一条Message的位置。
当leader挂了之后,现在B成为了leader,A重新恢复之后需要进行消息的同步,如果使用追加的方式那么就会有冗余消息,所以A将自己的消息截取到HW的位置在进行同步。
对于kafka节点活着的条件是什么?
第一点:一个节点必须维持和zk的会话,通过zk的心跳检测实现
第二点:如果节点是一个slave也就是复制节点,那么他必须复制leader节点不能太落后。
这里的落后可以指两种情况
1:数据复制落后,slave节点和leader节点的数据相差较大,这种情况有一个缺点,在生产者突然发送大量消息导致网络堵塞后,大量的slave复制受阻,导致数据复制落后被大量的踢出ISR。
2:时间相差过大,指的是slave向leader请求复制的时间距离上次请求相隔时间过大。通过配置replica.lag.time.max就可以配置这个时间参数。
I / O瓶颈 ,新启动副本 ,GC暂停或follower失效或死亡。
kafka分区partition挂掉之后如何恢复?
在kafka中有一个partition recovery机制用于恢复挂掉的partition。
每个Partition会在磁盘记录一个RecoveryPoint(恢复点), 记录已经flush到磁盘的最大offset。当broker fail 重启时,会进行loadLogs。 首先会读取该Partition的RecoveryPoint,找到包含RecoveryPoint点上的segment及以后的segment, 这些segment就是可能没有完全flush到磁盘segments。然后调用segment的recover,重新读取各个segment的msg,并重建索引。
优点:
以segment为单位管理Partition数据,方便数据生命周期的管理,删除过期数据简单
在程序崩溃重启时,加快recovery速度,只需恢复未完全flush到磁盘的segment即可
那么ISR是如何实现同步的呢?
broker 收到producer的请求
leader 收到消息,并成功写入,LEO 值+1
broker 将消息推给follower replica,follower 成功写入 LEO +1
…
所有LEO 写入后,leader HW +1
消息可被消费,并成功响应
上述过程从下面的图便可以看出:
消息可靠性
Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。通过调节其副本相关参数,可以使得Kafka在性能和可靠性之间运转的游刃有余。、
Producer发送消息的配置
1 同步模式
kafka有同步(sync)、异步(async)以及oneway这三种发送方式
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
(1)Producer\
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
一个消息如何算投递成功,Kafka提供了三种模式:
- 第一种是啥都不管,发送出去就当作成功,这种情况当然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和所有Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,但是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数情况下都会中和可靠性和性能选择第三种模型、
1. request.required.acks=1 (默认)
当ack=1,表示producer写partition leader成功后,broker就返回成功,无论其他的partition follower是否写成功。
\此时ISR中的副本还没有来得及拉取该消息,leader就宕机了,那么此次发送的消息就会丢失。\
2. request.required.acks=-1
同步(Kafka默认为同步,即producer.type=sync)的发送模式,
数据发送到leader, ISR的follower全部完成数据同步后,
表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。
\leader此时挂掉,那么会选举出新的leader,数据不会丢失。
3. request.required.acks=2
当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。
3. request.required.acks=0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。\
要保证数据不丢除了设置acks=-1, 还要保 证ISR的大小大于等于2,具体参数设置:
• (1)request.required.acks: 设置为-1 等待所有ISR列表中的Replica接收到消息后采算写成功;
• (2)min.insync.replicas: 设置为大于等于2,保证ISR中至少有两个Replica
Producer要在吞吐率和数据可靠性之间做一个权衡。
Broker消息的存储可靠性
1、刷盘时机
broker的刷盘时机主要是以下两个参数控制:
log.flush.interval.ms 日志刷盘的时间间隔,每隔多少时间将消息刷到磁盘上
log.flush.interval.messages 日志刷盘的消息量,每积累多少条消息将消息刷到磁盘上
2、副本数
在创建消息Topic的时候需要指定消息的副本数 replicas
一般建议设置成3保证消息的可靠,再结合客户端发送方的ack参数,当ack参数设置为0表示不等待broker响应就发送下一条消息,当ack设置为1则表示需要等待leader响应,当ack设置为all则表示需要等待所有的replicas都响应后才返回响应,其中all是最高可靠级别了,但是同时也降低了吞吐率。
消息的顺序消费问题
Producer
发送端不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。
(2)存储端
消息不能分区。也就是1个topic,只能有1个队列。
在Kafka中,它叫做partition; 如果你有多个partition队列,那同1个topic的消息,会分散到多个分区里面,自然不能保证顺序。
即使只有1个队列的情况下,会有第2个问题。该机器挂了之后,能否切换到其他机器?也就是高可用问题。比如你当前的机器挂了,上面还有消息没有消费完。此时切换到其他机器,可用性保证了。但消息顺序就乱掉了。要想保证,一方面要同步复制,不能异步复制;另1方面得保证,切机器之前,挂掉的机器上面,所有消息必须消费完了,不能有残留。很明显,这个很难。
(3)接收端
对于接收端,不能并行消费,也即不能开多线程或者多个客户端消费同1个队列。
消费者 Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。所以必须维护着上一次读到哪里的offsite信息。 (offset手动提交,业务逻辑成功处理后,提交offset) .,落表(选择唯一主键存储到Redis 中,先查询是否存在,若存在则不处理;若不存在,先插入Redis ,再进行业务逻辑处理 ,避免重复数据)
各场景测试总结:
- 当acks=-1时,Kafka发送端的TPS受限于topic的副本数量(ISR中),副本越多TPS越低;
- acks=0时,TPS最高,其次为1,最差为-1,即TPS:acks_0 > acks_1 > ack_-1
- min.insync.replicas参数不影响TPS;
- partition的不同会影响TPS,随着partition的个数的增长TPS会有所增长,但并不是一直成正比关系,到达一定临界值时,partition数量的增加反而会使TPS略微降低;
- Kafka在acks=-1,min.insync.replicas>=1时,具有高可靠性,所有成功返回的消息都可以落盘。
广泛应用于大数据领域:如网站行为分析、日志聚合、Apps监控等场景;
让数据集成变得简单:能将 Kafka 中的消息导入到 ODPS、HBase、HBASE 等离线数据仓库;
可广泛的与流计算引擎集成:包括阿里云平台的 StreamCompute、E-MapReduce 和开源产品 Spark、Storm 等;