1.1 消息发送核心流程

1.2 生产者初始化主要过程

1.3 消息发送具体流程

1.4 生产者内存池设计

1.1 消息发送核心流程


 

Kafka消息发送机制

1.2 生产者初始化主要过程


1、设置分区器

2、设置重试机制  retry.backoff.ms,默认100ms

3、设置序列化器

4、设置拦截器(国过滤器),意义不大

5、设置获取元数据metadata(每隔5min自动更新一下元数据

Kafka消息发送机制

6、设置单条消息大小,超过这个值消息就不能发送过去,默认1M,实际生产环境比这个要大(经验值10M)

7、设置压缩方式,提高吞吐量,相应的会消耗CPU

8、设置缓存大小,默认32M,一般够用

9、初始化一个管理网络的组件 NetWorkClient(几个重要的参数)

Kafka消息发送机制

10、初始化一个发送消息的线程Sender 并 启动

acks : 0、1、-1

Kafka消息发送机制

1.3 消息发送具体流程


kafkaProducer.send(ProducerRecord record, Callback callback)
record = new ProducerRecord<>(targetTopic, partitionId, timestamp, serializedKey, serializedValue);

1、同步等待拉取元数据

Kafka消息发送机制

2、对key和value进行序列化

3、根据分区器选择消息应该发送的分区,前提是 record 中没有指定 partitionId

1)如果消息没有key,则按照轮询方式发送

2)如果消息有key,则按照hash(key) % partitions 方式发送

4、确认消息大小是否超过单条(默认1M)以及缓存(默认32M)大小上限

5、对每一个消息绑定一个回调函数(异步发送情况)

6、将消息放到缓存中,每个partition对应一个队列

每个队列中有多个批次,单个批次的大小 = Max(16k,单条消息大小),但是如果单条消息>16k的话,就退化为一个消息一个批次

优化点:利用内存池-BufferPoll,避免Full GC

源码的精妙设计:Map<topicPartition,Dqueue<RecordBatch>>,key为缓存中的partition,value为相应的批次队列。这里的Map不是CurrentHasnMap,而是Kafka自己设计的一种读写分离的Map – CopyOnWriteMap,适合高并发多线程下读多(不加lock)写少(加lock)的场景。

因为只有当第一个消息过来没有相应partition时时才会创建kv,而每一个消息都会读取kv!!!

Kafka消息发送机制

1.4 生产者内存池设计


Kafka消息发送机制

创建批次时会申请内存池中的内存单元,在批次发送到服务端后又会归还给内存池异变进行复用。这样可以避免JVM Full GC的发生。

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-11-16
  • 2021-11-12
  • 2022-02-02
  • 2022-12-23
  • 2022-12-23
猜你喜欢
  • 2022-01-06
  • 2021-07-10
  • 2021-05-19
  • 2022-01-14
  • 2021-11-15
  • 2021-06-13
  • 2021-08-23
相关资源
相似解决方案