1. 生产者客户端开发

​ 熟悉kafka的朋友都应该知道kafka客户端有新旧版本,老版本采用scala编写,新版本采用java编写。随着kafka版本的升级,旧版本客户端已经快被完全替代了。因此,我们以新客户端为例进行介绍。

​ 客户端开发的步骤如下:

  • ​ 配置生产者客户端参数及创建相应的生产者实例
  • ​ 构建待发送的信息
  • ​ 发送信息
  • ​ 关闭生产者实例

代码如下:

}

需要maven依赖如下:

​ 这里有必要对构建的消息对象ProduceRecord进行说明,ProduceRecord对象包括以下几个属性:

​ topic和partititon用来指定消息发送到主题分区。header是指消息头部,从0.11.x这个版本引进的。Key是指消息的键,可通过分区号让消息发往特定的分区【可以使key相同的消息发送到同一分区】,有key的消息还可以支持日志压缩的功能。value为消息体,一般不为空,如果为空则表示特定的消息——墓碑消息。timestamp指消息的时间戳,有两种类型CreateTime和LogAppendTime,前者表示消息创建时间,后者表示消息追加到日志文件的时间。

}

​ 通过以下这种方式创建ProduceRecord对象,只是指定了最基本的两个属性,topic和value。ProducerRecord包括多个构造函数,可灵活使用。

1.1 必要的参数配置

​ bootstrap.servers:指定客户端连接的broker地址清单。

​ Key.serializer和value.serializer用于指定消息的key和value的序列化器。

​ client.id指定KafkaProducer的id,默认系统会自动生成。

​ 由于参数的名称特别多,而且是字符串容易写错,因此客户端提供了一个类ProducerConfig,包括所有的参数名称。同样需要注意,由于key和value的序列化器需要类的全限定名,可通过一下方式改进。

prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

。生产者包括一个缓冲区空间池,其中保存尚未传输到服务器的记录,以及一个后台I/O线程,该线程负责将这些记录转换为请求并将它们传输到集群。使用后不关闭生产商将泄漏这些资源。

1.2 消息的发送

​ 发送消息主要有三种方式:发后即忘(fire-and-forget)、同步sync和一部async。

​ (1)fire-and-forget

​ 只管往kafka中发送消息,不管消息是否到达。大多数情况下,这种方式不会出现问题,但当发生不可重试异常时,会造成数据丢失。性能最高,可靠性最差。以下这种方式就是fire-and-forget:

 }

​ (2)sync

​ send()方法是有返回值的,是一个Future对象。

}

​ 实际上send方法本身是异步,可以通过调用Future对象的get方法阻塞等待Kafka的响应,直到消息发送成功或抛出异常【对异常可以做响应的处理】,实现同步。可以从RecordMetadata获取发送成功的ProduceRecord的相关元数据,包括topic、partition、offset、timestamp等。当然也可以通过Future的get(long timeout, TimeUnit unit)实现超时阻塞。

    }

​ 另外,KafkaProducer一般会产生两类异常:可重试异常和不可重试异常。可充实异常有NetworkException、LeaderNotAvaliableException、UnKnownTopicOrPartitonException、NotEnoughRepliasException、NotCoordinatorException。对于可重试异常,可以通过配置retires属性,进行特定次数的重试,重试成功不会抛出异常,重试失败抛出异常。

prop.put(ProducerConfig.RETRIES_CONFIG, 10);

​ 对于不可重复异常,如RecordTooLargeException,发生后支持抛出异常。

​ 同步方式可靠性高,要么消息发送成功,要么发生异常,可捕获进行处理。不过同步方式的性能要差一些,需要阻塞等待消息发送完之后才能发送下一条消息。

(3)async

。即如果record1先于record2先发送 ,则对应的callback1先于callback2被调用。

});

​ 对于producer的close方法也是重载,可以实现超时强行关闭,但是一般不这样使用。

public void close(long timeout, TimeUnit timeUnit)

1.3 序列化

​ 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka集群,同样消费者必须通过与之对应的反序列化器进行解析。kafka-client提供了多种数据类型对象的序列化器,父接口为org.apache.kafka.common.serialization.Serializer接口。

}

​ 自定义序列化器:

​ 这里为了方面使用了lombok框架,maven依赖如下,注意在idea中还要安装响应的插件,否则注解不生效。

</dependency>

​ 创建Company对象的序列化器:

​ 使用的话只需要设置prop的key.serializer等设置为CompanySerializer即可。

1.4 分区器

的一系列作用之后才会发往broker。拦截器不是必须的,序列化器是必须的。消息进过序列化之后就要确定发往那个分区。如果ProducerRecord中指定了partition字段,则不需要分区器的作用,如果没有,则需需要依赖于分区器,根据Producerrecord的key进行分区。

1.4.1 默认分区器

​ 分区器的父接口为org.apache.kafka.clients.producer.Partitioner接口。

默认分区器为:org.apache.kafka.clients.producer.internals.DefalutPartitoner,,源码如下:

}

默认分区器的逻辑就是:

​ 如果key不为空,则进行对key进行hash计算分区

​ 如果为空,且存在可用分区,则在可用分区中轮训,不存在可用分区,则在所有分区中轮训。

1.4.2 自定义分区器

​ 可通过实现partitioner接口,自定义分区器。

​ 分区逻辑就是,有key进行hash分区,无key在所有分区中轮训

}

分区器的配置:

prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

1.5 生产者拦截器

​ 拦截器是在Kafka0.10.0.0版本出现的,有生产者拦截器和消费者拦截器两种。

​ (1)为消息提供定制化的操作

​ (2)可以用来在发送回掉逻辑前做一些定制化的需求。

​ 拦截器通过自定义实现org.apache.kafka.clients.producer.ProducerInterceptor。onSend()方法对消息进行相应的定制化操作;onAcknowledgement()方法会在消息在应答之前或消息发送失败时被调用,因此此方法优先于callback方法执行。

​ 拦截器的使用:

prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());

​ 多个拦截器之间用“,”隔开,注意多个拦截器是有顺序的。

2. 原理分析

​ KafkaProudcer在真正把消息发往Kafka集群时,会依次经历拦截器、序列化器、和分区器,然后缓存到消息累加器RecordAccumulator中,Sender线程负责从RecordAccumulator中获取消息并发送到Kafka集群。

2.1 消息发送到RecordAccumulator

​ KafkaProducer调用send方法发送ProducerRecord,首先会通过拦截器链进行定制化操作,然后调用了doSend方法。发放如下:

    }

(1)waitOnMetadata阻塞式获取metaData,超过${max.block.ms}时间依旧未获取到,则抛TimeoutException,消息发送失败。

(2)对key和value进行序列化

(3)根据消息和集群metaData计算发往的分区,,并创建主题分区对象。

},也会抛出RecordTooLargeException异常。

    }

(5)将消息记录append到RecordAccumulator。

的双端队列。那ProducerBatch又是什么呢。ProducerBatch就是ProducerRecord的批次,可以包括一个或多个ProduerRecord,ProducerBatch的大小可以通过batch.size这个参数设置,不过当一个ProducerRecord的大小超过batch.size的大小时,就会生产一个新的ProducerBatch,这个ProducerBatch的大小就是该ProducerRecord的大小。也许你会产生一个疑问,既然ProducerBatch的大小不一定等于batch.size,那么为什么还要使用这个参数,其实是为了更好的管理内存,在kafka中通过java.io.ByteBuffer实现消息内存的创建和释放,不过为了减少频繁的创建和释放内存空间,RecordAccumulator内部使用了BufferPool实现对特定大小的ByteBuffer进行管理,实现复用,特定大小就是通过batch.size这个参数进行设置,同样如果当前ProducerBatch的大小超过batch.size,那个这个ByteBuffer不能实现复用。

​ RecordAccumulator通过append方法将ProducerRecord追加到具体的ProducerBatch中,过程如下:

​ (1)记录当前正在进行append消息的线程数,方便当客户端调用 KafkaProducer.close()强制关闭发送消息操作时放弃未处理完的请求,释放资源

​ (2)getOrCreateDeque,获得或创建主题分区对应的ProducerBatch的双端队列。

​ (3)tryAppend(timestamp, key, value, headers, callback, dq),尝试将消息append到双端队列。

    }

再来看一下ProducerBatch如何尝试append消息。

    }

(4)若上述尝试append消息失败,即返回null,此时需要向BufferPool申请空间用于创建新的ProducerBatch对象,并将消息append到新创建的ProducerBatch中,最后返回处理结果。

    }

2.2 Sender发送消息至Kafka集群

的size来判断对用的Node中是否堆积了很多未处理的消息,如果真是如此,说明Node节点的网络负载较大或者连接有问题。

2.3 元数据的更新

所谓的元数据是指的Kafka集群的元数据,包括集群中的主题、分区、Leader、Follower等,当客户端不存在需要使用的元数据信息或者超过metadata.max.age.ms[默认5分钟],会引起元数据的更新。当元数据需要更新时,会首先挑选出负载最小的node,向他发送MetaDataRequest请求,这个更新操作由send线程发起,同样会存入InFlightRequest中。由于主线程也需要元数据,因此需要通过synchronize和final关键字保证。

2.4 生产者客户端的重要参数

  • acks

    取值有0,1,-1,用于指定分区中至少有多少副本收到这个现象,之后生产者才会认为该消息被写入。

  • max.request.size

    限制生产者客户端发送消息的最大值

  • reties 生产者发送出现异常时的重试次数
  • 每次重试的时间间隔
  • 生产者端消息的压缩方式
  • connections.max.idles.ms 连接限制关闭时间
  • 用于配置ProducerBatch等待加入ProducerRecord的时间
  • Socket接收消息缓冲区
  • Socket发送器的缓冲区
  • request.timeout.ms 生产者等待请求响应的最长时间,请求超时可以进行重试。这个参数大于broker端的replia.lag.time.max.ms
  • 生产者客户端用于缓存消息的缓冲区大小
  • batch.size 指定ProducerBatch可以复用缓冲区的大小
  • 生产者send方法和paritionFor方法的阻塞时间
  • max.in.flight.requests.per.connection 限制每个链接最多缓存的请求数量
  • 更新元数据的时间

相关文章:

  • 2022-12-23
  • 2019-06-25
  • 2021-04-18
  • 2021-10-09
  • 2021-06-16
  • 2022-02-09
  • 2021-04-27
猜你喜欢
  • 2022-12-23
  • 2022-12-23
  • 2021-12-14
  • 2022-02-05
  • 2022-03-11
  • 2021-05-31
  • 2019-03-02
相关资源
相似解决方案