转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html
本文所研究的spark-streaming代码版本为2.3.0-SNAPSHOT
spark-streaming为了匹配0.10以后版本的kafka客户端变化推出了一个目前还是Experimental状态的spark-streaming-kafka-0-10客户端,由于老的0.8版本无法支持kerberos权限校验,需要研究下spark-streaming-kafka-0-10的源码实现以及系统架构。
首先看下初始化kafkastream的方法声明,
def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
}
DirectKafkaInputDStream的初始化参数包括StreamingContext,LocationStrategy,ConsumerStrategy和perPartitionConfig,根据源码文档locationStrategy一般采用PreferConsistent
,perPartitionConfig一般采用默认实现,这里不做研究,主要会有点区别的参数为consumerStrategy,它的作用会在下面的源码分析里展示出来。
一 driver consumer
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils .createDirectStream(jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));