【问题标题】:Kafka Spark Streaming multiple aggregationKafka Spark Streaming 多重聚合
【发布时间】:2016-11-16 05:57:28
【问题描述】:
我正在使用 Spark Kafka Integration 0.10,我需要对流进行两级聚合:
- 第一个是每分钟间隔
- 另一种是以 15 分钟为间隔求和。
另外,首选是累积一分钟的间隔值,然后在超过 15 分钟时将其重置 b/c 应该保留 15 分钟的值。
在不同的滑动窗口上有两个reduceByKeysByWindows 不起作用,因为它会给出KafkaConcurrentModification 异常。
【问题讨论】:
标签:
apache-spark
apache-kafka
spark-streaming
【解决方案1】:
tl;dr 似乎有效。请提供一个失败的例子。
我使用的是 Spark 2.0.2(发布于 today)。
我的例子如下(为简洁起见,删除了一些代码):
val ssc = new StreamingContext(sc, Seconds(10))
import org.apache.spark.streaming.kafka010._
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
def reduceFunc(v1: String, v2: String) = s"$v1 + $v2"
dstream.map { r =>
println(s"value: ${r.value}")
val Array(key, value) = r.value.split("\\s+")
println(s">>> key = $key")
println(s">>> value = $value")
(key, value)
}.reduceByKeyAndWindow(
reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10))
.print()
dstream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
}
}
ssc.start
看到遇到的异常,您会进行哪些更改?
整个项目可通过spark-streaming-kafka-direct 获得。