【发布时间】:2020-04-07 02:41:18
【问题描述】:
我正在尝试从 kafka 流中读取数据,对其进行处理并将其保存到报告中。我想每天运行一次这项工作。我正在使用 dStreams。 dStreams 中是否有等效的 trigger(Trigger.Once) 可以用于这种情况。感谢您的建议和帮助。
def main(args: Array[String]) {
val spark = ...
val ssc = new StreamingContext(sc, Seconds(jobconfig.getLong("batchInterval")))
val kafkaStream =
KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(jobconfig.getString("topic")), kafkaParams))
kafkaStream.foreachRDD(rdd => {
.
.
.
.
}
sqlContext.clearCache()
ssc.start()
ssc.awaitTermination()
}
【问题讨论】:
标签: apache-spark spark-streaming dstream