1. 生产者客户端开发
熟悉kafka的朋友都应该知道kafka客户端有新旧版本,老版本采用scala编写,新版本采用java编写。随着kafka版本的升级,旧版本客户端已经快被完全替代了。因此,我们以新客户端为例进行介绍。
客户端开发的步骤如下:
- 配置生产者客户端参数及创建相应的生产者实例
- 构建待发送的信息
- 发送信息
- 关闭生产者实例
代码如下:
}
需要maven依赖如下:
这里有必要对构建的消息对象ProduceRecord进行说明,ProduceRecord对象包括以下几个属性:
topic和partititon用来指定消息发送到主题分区。header是指消息头部,从0.11.x这个版本引进的。Key是指消息的键,可通过分区号让消息发往特定的分区【可以使key相同的消息发送到同一分区】,有key的消息还可以支持日志压缩的功能。value为消息体,一般不为空,如果为空则表示特定的消息——墓碑消息。timestamp指消息的时间戳,有两种类型CreateTime和LogAppendTime,前者表示消息创建时间,后者表示消息追加到日志文件的时间。
}
通过以下这种方式创建ProduceRecord对象,只是指定了最基本的两个属性,topic和value。ProducerRecord包括多个构造函数,可灵活使用。
1.1 必要的参数配置
bootstrap.servers:指定客户端连接的broker地址清单。
Key.serializer和value.serializer用于指定消息的key和value的序列化器。
client.id指定KafkaProducer的id,默认系统会自动生成。
由于参数的名称特别多,而且是字符串容易写错,因此客户端提供了一个类ProducerConfig,包括所有的参数名称。同样需要注意,由于key和value的序列化器需要类的全限定名,可通过一下方式改进。
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
。生产者包括一个缓冲区空间池,其中保存尚未传输到服务器的记录,以及一个后台I/O线程,该线程负责将这些记录转换为请求并将它们传输到集群。使用后不关闭生产商将泄漏这些资源。
1.2 消息的发送
发送消息主要有三种方式:发后即忘(fire-and-forget)、同步sync和一部async。
(1)fire-and-forget
只管往kafka中发送消息,不管消息是否到达。大多数情况下,这种方式不会出现问题,但当发生不可重试异常时,会造成数据丢失。性能最高,可靠性最差。以下这种方式就是fire-and-forget:
}
(2)sync
send()方法是有返回值的,是一个Future对象。
}
实际上send方法本身是异步,可以通过调用Future对象的get方法阻塞等待Kafka的响应,直到消息发送成功或抛出异常【对异常可以做响应的处理】,实现同步。可以从RecordMetadata获取发送成功的ProduceRecord的相关元数据,包括topic、partition、offset、timestamp等。当然也可以通过Future的get(long timeout, TimeUnit unit)实现超时阻塞。
}
另外,KafkaProducer一般会产生两类异常:可重试异常和不可重试异常。可充实异常有NetworkException、LeaderNotAvaliableException、UnKnownTopicOrPartitonException、NotEnoughRepliasException、NotCoordinatorException。对于可重试异常,可以通过配置retires属性,进行特定次数的重试,重试成功不会抛出异常,重试失败抛出异常。
prop.put(ProducerConfig.RETRIES_CONFIG, 10);
对于不可重复异常,如RecordTooLargeException,发生后支持抛出异常。
同步方式可靠性高,要么消息发送成功,要么发生异常,可捕获进行处理。不过同步方式的性能要差一些,需要阻塞等待消息发送完之后才能发送下一条消息。
(3)async
。即如果record1先于record2先发送 ,则对应的callback1先于callback2被调用。
});
对于producer的close方法也是重载,可以实现超时强行关闭,但是一般不这样使用。
public void close(long timeout, TimeUnit timeUnit)
1.3 序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka集群,同样消费者必须通过与之对应的反序列化器进行解析。kafka-client提供了多种数据类型对象的序列化器,父接口为org.apache.kafka.common.serialization.Serializer接口。
}
自定义序列化器:
这里为了方面使用了lombok框架,maven依赖如下,注意在idea中还要安装响应的插件,否则注解不生效。
</dependency>
创建Company对象的序列化器:
使用的话只需要设置prop的key.serializer等设置为CompanySerializer即可。
1.4 分区器
的一系列作用之后才会发往broker。拦截器不是必须的,序列化器是必须的。消息进过序列化之后就要确定发往那个分区。如果ProducerRecord中指定了partition字段,则不需要分区器的作用,如果没有,则需需要依赖于分区器,根据Producerrecord的key进行分区。
1.4.1 默认分区器
分区器的父接口为org.apache.kafka.clients.producer.Partitioner接口。
默认分区器为:org.apache.kafka.clients.producer.internals.DefalutPartitoner,,源码如下:
}
默认分区器的逻辑就是:
如果key不为空,则进行对key进行hash计算分区
如果为空,且存在可用分区,则在可用分区中轮训,不存在可用分区,则在所有分区中轮训。
1.4.2 自定义分区器
可通过实现partitioner接口,自定义分区器。
分区逻辑就是,有key进行hash分区,无key在所有分区中轮训
}
分区器的配置:
prop.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
1.5 生产者拦截器
拦截器是在Kafka0.10.0.0版本出现的,有生产者拦截器和消费者拦截器两种。
(1)为消息提供定制化的操作
(2)可以用来在发送回掉逻辑前做一些定制化的需求。
拦截器通过自定义实现org.apache.kafka.clients.producer.ProducerInterceptor。onSend()方法对消息进行相应的定制化操作;onAcknowledgement()方法会在消息在应答之前或消息发送失败时被调用,因此此方法优先于callback方法执行。
拦截器的使用:
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class.getName());
多个拦截器之间用“,”隔开,注意多个拦截器是有顺序的。
2. 原理分析
KafkaProudcer在真正把消息发往Kafka集群时,会依次经历拦截器、序列化器、和分区器,然后缓存到消息累加器RecordAccumulator中,Sender线程负责从RecordAccumulator中获取消息并发送到Kafka集群。
2.1 消息发送到RecordAccumulator
KafkaProducer调用send方法发送ProducerRecord,首先会通过拦截器链进行定制化操作,然后调用了doSend方法。发放如下:
}
(1)waitOnMetadata阻塞式获取metaData,超过${max.block.ms}时间依旧未获取到,则抛TimeoutException,消息发送失败。
(2)对key和value进行序列化
(3)根据消息和集群metaData计算发往的分区,,并创建主题分区对象。
},也会抛出RecordTooLargeException异常。
}
(5)将消息记录append到RecordAccumulator。
的双端队列。那ProducerBatch又是什么呢。ProducerBatch就是ProducerRecord的批次,可以包括一个或多个ProduerRecord,ProducerBatch的大小可以通过batch.size这个参数设置,不过当一个ProducerRecord的大小超过batch.size的大小时,就会生产一个新的ProducerBatch,这个ProducerBatch的大小就是该ProducerRecord的大小。也许你会产生一个疑问,既然ProducerBatch的大小不一定等于batch.size,那么为什么还要使用这个参数,其实是为了更好的管理内存,在kafka中通过java.io.ByteBuffer实现消息内存的创建和释放,不过为了减少频繁的创建和释放内存空间,RecordAccumulator内部使用了BufferPool实现对特定大小的ByteBuffer进行管理,实现复用,特定大小就是通过batch.size这个参数进行设置,同样如果当前ProducerBatch的大小超过batch.size,那个这个ByteBuffer不能实现复用。
RecordAccumulator通过append方法将ProducerRecord追加到具体的ProducerBatch中,过程如下:
(1)记录当前正在进行append消息的线程数,方便当客户端调用 KafkaProducer.close()强制关闭发送消息操作时放弃未处理完的请求,释放资源
(2)getOrCreateDeque,获得或创建主题分区对应的ProducerBatch的双端队列。
(3)tryAppend(timestamp, key, value, headers, callback, dq),尝试将消息append到双端队列。
}
再来看一下ProducerBatch如何尝试append消息。
}
(4)若上述尝试append消息失败,即返回null,此时需要向BufferPool申请空间用于创建新的ProducerBatch对象,并将消息append到新创建的ProducerBatch中,最后返回处理结果。
}
2.2 Sender发送消息至Kafka集群
的size来判断对用的Node中是否堆积了很多未处理的消息,如果真是如此,说明Node节点的网络负载较大或者连接有问题。
2.3 元数据的更新
所谓的元数据是指的Kafka集群的元数据,包括集群中的主题、分区、Leader、Follower等,当客户端不存在需要使用的元数据信息或者超过metadata.max.age.ms[默认5分钟],会引起元数据的更新。当元数据需要更新时,会首先挑选出负载最小的node,向他发送MetaDataRequest请求,这个更新操作由send线程发起,同样会存入InFlightRequest中。由于主线程也需要元数据,因此需要通过synchronize和final关键字保证。
2.4 生产者客户端的重要参数
-
acks
取值有0,1,-1,用于指定分区中至少有多少副本收到这个现象,之后生产者才会认为该消息被写入。
-
max.request.size
限制生产者客户端发送消息的最大值
- reties 生产者发送出现异常时的重试次数
- 每次重试的时间间隔
- 生产者端消息的压缩方式
- connections.max.idles.ms 连接限制关闭时间
- 用于配置ProducerBatch等待加入ProducerRecord的时间
- Socket接收消息缓冲区
- Socket发送器的缓冲区
- request.timeout.ms 生产者等待请求响应的最长时间,请求超时可以进行重试。这个参数大于broker端的replia.lag.time.max.ms
- 生产者客户端用于缓存消息的缓冲区大小
- batch.size 指定ProducerBatch可以复用缓冲区的大小
- 生产者send方法和paritionFor方法的阻塞时间
- max.in.flight.requests.per.connection 限制每个链接最多缓存的请求数量
- 更新元数据的时间