【发布时间】:2016-08-28 21:10:14
【问题描述】:
给定以下代码:
def createKafkaStream(ssc: StreamingContext,
kafkaTopics: String, brokers: String): DStream[(String, String)] = {
// some configs here
KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, props, topicsSet)
}
def consumerHandler(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(10))
createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
rdd.foreach { msg =>
// Now do some DataFrame-intensive work.
// As I understand things, DataFrame ops must be run
// on Workers as well as streaming consumers.
}
})
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()
我的理解是,Spark 和 Kafka 将自动协同工作,以确定有多少消费者线程部署到可用的 Worker 节点,这可能会导致并行处理来自 Kafka 主题的消息。 p>
但是如果我不想要多个并行消费者怎么办?如果希望 1-and-only-1 消费者读取来自主题的下一条消息,完全处理它,然后重新开始并轮询下一条消息。
另外,当我打电话时:
val ssc = new StreamingContext(sc, Seconds(10))
这是否意味着:
- 单个消费者线程将接收过去 10 秒内发布到主题的所有消息; 或
- 单个消费者线程将接收来自主题的下一条(单条)消息,并且每 10 秒轮询下一条消息?
【问题讨论】:
-
感谢您的反馈,您说了几句话:“对于初学者,您展示的伪代码无法生成有效的 Spark 应用程序(您无法在操作中执行 DataFrame 密集型工作) )." (1) 你是什么意思无法执行?我一直在运行确实调用DataFrame操作的操作,所以你只是说我不应该这样做吗?或者你的意思是说我几周来一直在做的事情应该是不可能的?!?
标签: scala apache-spark apache-kafka spark-streaming