【发布时间】:2017-05-09 14:00:29
【问题描述】:
我是 Spark 和 Kafka 的新手,我正在尝试获取一些 Scala 代码(作为 Spark 作业运行)作为长期运行的进程(不仅仅是短期/计划任务)并不断轮询用于消息的 Kafka 代理。当它收到消息时,我只想将它们打印到控制台/STDOUT。同样,这需要是一个长期运行的过程,并且基本上(尝试)永远存在。
在进行了一些挖掘之后,似乎StreamingContext 是我想要使用的。这是我最好的尝试:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.storage._
import org.apache.spark.streaming.{StreamingContext, Seconds, Minutes, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
def createKafkaStream(ssc: StreamingContext, kafkaTopics: String, brokers: String): DStream[(String, String)] = {
val topicsSet = kafkaTopics.split(",").toSet
val props = Map(
"bootstrap.servers" -> "my-kafka.example.com:9092",
"metadata.broker.list" -> "my-kafka.example.com:9092",
"serializer.class" -> "kafka.serializer.StringEncoder",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, props, topicsSet)
}
def processEngine(): StreamingContext = {
val ssc = new StreamingContext(sc, Seconds(1))
val topicStream = createKafkaStream(ssc, "mytopic", "my-kafka.example.com:9092").print()
ssc
}
StreamingContext.getActive.foreach {
_.stop(stopSparkContext = false)
}
val ssc1 = StreamingContext.getActiveOrCreate(processEngine)
ssc1.start()
ssc1.awaitTermination()
当我运行它时,我没有收到任何异常/错误,但似乎什么也没发生。我可以确认有关于该主题的消息。关于我要去哪里出错的任何想法?
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming