1.1 消息发送核心流程
1.2 生产者初始化主要过程
1、设置分区器
2、设置重试机制 retry.backoff.ms,默认100ms
3、设置序列化器
4、设置拦截器(国过滤器),意义不大
5、设置获取元数据metadata(每隔5min自动更新一下元数据)
6、设置单条消息大小,超过这个值消息就不能发送过去,默认1M,实际生产环境比这个要大(经验值10M)
7、设置压缩方式,提高吞吐量,相应的会消耗CPU
8、设置缓存大小,默认32M,一般够用
9、初始化一个管理网络的组件 NetWorkClient(几个重要的参数)
10、初始化一个发送消息的线程Sender 并 启动
acks : 0、1、-1
1.3 消息发送具体流程
kafkaProducer.send(ProducerRecord record, Callback callback)
record = new ProducerRecord<>(targetTopic, partitionId, timestamp, serializedKey, serializedValue);
1、同步等待拉取元数据
2、对key和value进行序列化
3、根据分区器选择消息应该发送的分区,前提是 record 中没有指定 partitionId
1)如果消息没有key,则按照轮询方式发送
2)如果消息有key,则按照hash(key) % partitions 方式发送
4、确认消息大小是否超过单条(默认1M)以及缓存(默认32M)大小上限
5、对每一个消息绑定一个回调函数(异步发送情况)
6、将消息放到缓存中,每个partition对应一个队列
每个队列中有多个批次,单个批次的大小 = Max(16k,单条消息大小),但是如果单条消息>16k的话,就退化为一个消息一个批次
优化点:利用内存池-BufferPoll,避免Full GC
源码的精妙设计:Map<topicPartition,Dqueue<RecordBatch>>,key为缓存中的partition,value为相应的批次队列。这里的Map不是CurrentHasnMap,而是Kafka自己设计的一种读写分离的Map – CopyOnWriteMap,适合高并发多线程下读多(不加lock)写少(加lock)的场景。
因为只有当第一个消息过来没有相应partition时时才会创建kv,而每一个消息都会读取kv!!!
1.4 生产者内存池设计
创建批次时会申请内存池中的内存单元,在批次发送到服务端后又会归还给内存池异变进行复用。这样可以避免JVM Full GC的发生。