【问题标题】:Kafka Spark Streaming multiple aggregationKafka Spark Streaming 多重聚合
【发布时间】:2016-11-16 05:57:28
【问题描述】:

我正在使用 Spark Kafka Integration 0.10,我需要对流进行两级聚合:

  1. 第一个是每分钟间隔
  2. 另一种是以 15 分钟为间隔求和。

另外,首选是累积一分钟的间隔值,然后在超过 15 分钟时将其重置 b/c 应该保留 15 分钟的值。

在不同的滑动窗口上有两个reduceByKeysByWindows 不起作用,因为它会给出KafkaConcurrentModification 异常。

【问题讨论】:

标签: apache-spark apache-kafka spark-streaming


【解决方案1】:

tl;dr 似乎有效。请提供一个失败的例子。

我使用的是 Spark 2.0.2(发布于 today)。

我的例子如下(为简洁起见,删除了一些代码):

val ssc = new StreamingContext(sc, Seconds(10))
import org.apache.spark.streaming.kafka010._

val dstream = KafkaUtils.createDirectStream[String, String](
  ssc,
  preferredHosts,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

def reduceFunc(v1: String, v2: String) = s"$v1 + $v2"
dstream.map { r =>
  println(s"value: ${r.value}")
  val Array(key, value) = r.value.split("\\s+")
  println(s">>> key = $key")
  println(s">>> value = $value")
  (key, value)
}.reduceByKeyAndWindow(
  reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10))
  .print()

dstream.foreachRDD { rdd =>
  // Get the offset ranges in the RDD
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
  }
}

ssc.start

看到遇到的异常,您会进行哪些更改?

整个项目可通过spark-streaming-kafka-direct 获得。

【讨论】:

    猜你喜欢
    • 2017-04-22
    • 1970-01-01
    • 2019-08-08
    • 1970-01-01
    • 1970-01-01
    • 2016-03-12
    • 2018-05-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多