转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/7767621.html  

本文所研究的spark-streaming代码版本为2.3.0-SNAPSHOT

 spark-streaming为了匹配0.10以后版本的kafka客户端变化推出了一个目前还是Experimental状态的spark-streaming-kafka-0-10客户端,由于老的0.8版本无法支持kerberos权限校验,需要研究下spark-streaming-kafka-0-10的源码实现以及系统架构。

 首先看下初始化kafkastream的方法声明,

def createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V],
perPartitionConfig: PerPartitionConfig
): InputDStream[ConsumerRecord[K, V]] = {
new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
}

DirectKafkaInputDStream的初始化参数包括StreamingContext,LocationStrategy,ConsumerStrategy和perPartitionConfig,根据源码文档locationStrategy一般采用PreferConsistent

,perPartitionConfig一般采用默认实现,这里不做研究,主要会有点区别的参数为consumerStrategy,它的作用会在下面的源码分析里展示出来。

 

一  driver consumer

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
                .createDirectStream(jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String> Subscribe(topics,
                                kafkaParams));
View Code

相关文章:

  • 2021-12-08
  • 2022-12-23
  • 2021-08-07
  • 2022-01-10
  • 2022-12-23
  • 2021-08-12
  • 2022-12-23
  • 2021-08-07
猜你喜欢
  • 2022-12-23
  • 2021-06-09
  • 2021-09-06
  • 2021-12-18
  • 2021-12-26
  • 2022-12-23
  • 2022-12-23
相关资源
相似解决方案