【发布时间】:2016-10-25 13:58:15
【问题描述】:
我有一个从 Kafka 流出的 Spark 消费者。 我正在尝试管理精确一次语义的偏移量。
但是,在访问偏移量时会引发以下异常:
"java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD 无法转换为 org.apache.spark.streaming.kafka.HasOffsetRanges"
执行此操作的代码部分如下:
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
这里的 dataStream 是使用 KafkaUtils API 创建的直接流(DStream[String]),类似于:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
如果有人可以帮助我理解我在这里做错了什么。 transform 是官方文档中提到的在数据流上执行的方法链中的第一个方法
谢谢。
【问题讨论】:
标签: scala apache-spark apache-kafka spark-streaming rdd