主要从Kafka特性和使用角度来学Kafka
(1).Kafka吞吐量高的三个原因
1.底层采用零拷贝
2.可以批量来发送接收数据
3.磁盘顺序存储
4.分区存储,分区消费
拓展描述下零拷贝:
常见调用进程读取a.txt文件,发送a.txt字节流到其他机器过程描述:
1.读取磁盘文件,存入到内核态的内存页缓存上。
2.切换到用户态,将页缓存上的数据拷贝到用户态内存上。(一般情况下如果做计算操作,那就在用户编写的代码上体现)
3.用户态切换回内核态,将数据拷贝到socket buffer上
4.将socket buffer上到数据发送到网卡buffer上传输
零拷贝所做到:
1.读取磁盘文件,存入到内核态的内存页缓存上。(same)
2.通过gather机制,底层调用sendfile方法,页缓存+socket fd(存储offset等信息),直接将数据拷到网卡内存。
(2) kafka消费者端保证数据不丢和数据不重复消费
不重复消费可以通过Mysql唯一索引来实现,采用先查再改的方式,由于是多台实例消费,这种情况下事务的原因,会导致插入失败的异常。记得try{}catch{}跳过此次循环。
不丢失,就通过每次消费完后,手动提交实现即可。
(3) kafka生产者端保证数据不丢和数据不重复消费且保证发送的吞吐量
不重复同上。
不丢失通过设置ack=-1来实现,要让kafka集群中的主节点以及ISR中的节点都返回ACK,才算发送成功。
生产者端的吞吐量由批量发送来决定,涉及的参数如下:
1.batch.size: 每次数据叠加达到这个数据量,就发送。 默认16KB
2.max.request.size 当一条数据超出这个量,就抛出异常。 默认1MB
3.linger.ms 每过linger.ms,将叠加的数据发送。默认100ms
4.buffer.mermory 缓存的总内存大小,超过这个内存,发送数据的线程阻塞。 默认32MB
生产者端源码框架(盗图):
简单介绍下两个方法的调用逻辑,
1.调用构造方法 ==>创建序列化器,分区器,拦截器,底层开启sender(I/O)线程,轮训从RecoderAccumlator中取出数据,一般还是从map中获取一个Deque,synchronized(deque),再执行操作。保证线程安全。
2.调用send()方法 ==>进行了batchsize-request.max.size之间的一些判断,然后动态分配内存,存入RecordBatch。也是线程安全的,通过map中获取一个Deque,synchronized(deque)。
判断逻辑如下: 超过request.max.size抛出异常,batchsize-request.max.size之间,动态腾出内存空间完成,小于batchSize继续积累或者到了linger.ms时间发送。