【问题标题】:Spark UnionRDD cannot be cast to HasOffsetRangesSpark UnionRDD 无法转换为 HasOffsetRanges
【发布时间】:2017-11-27 20:13:01
【问题描述】:

因为我收到了来自多个不同 kafka 主题的消息。所以我需要使用 StreamingContext.union 方法来合并流。但是在尝试将 kafka 偏移量更新到 Zoopkeeper 时遇到了一些问题。

错误如下:

java.lang.ClassCastException: org.apache.spark.rdd.UnionRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at com.qingqing.spark.util.KafkaManager.updateZKOffsets(KafkaManager.scala:75)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:43)
at com.qingqing.spark.BinlogConsumer$$anonfun$consumeBinlog$3.apply(BinlogConsumer.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

我的代码如下:

谁能帮我找出问题。提前致谢

【问题讨论】:

标签: scala apache-spark apache-kafka rdd classcastexception


【解决方案1】:

这并不是您使用 Kafka 和 Spark Streaming 的直接流式处理 API 的真正方式。由于您使用的是直接方法,因此这里没有接收器,因此不需要多个流,只需要一个消耗所有主题的流。您需要提取要使用的所有主题的偏移量,然后创建由createDirectStream 创建的DirectKafkaInputDStream 的单个实例。由于我没有代码,因此粗略的草图如下所示:

val offsets: Map[TopicAndPartition, Long] = 
  topics.map { /* create the (TopicAndPartition, Long) tuple here for each topic */ }

val kafkaInputStream = 
  KafkaUtils.createDirectStream(ssc, kafkaParams, offsets, (mmd) => (mmd.key, mmd.value))

对于没有存储任何偏移量的主题,只需从偏移量0开始。

至于HasOffsetRanges,需要在创建kafka流转换后直接映射,只有底层RDD才能真正实现该特征。您需要立即通过流transform

val streamAfterTransform = kafkaInputStream.transform { rdd =>
  val ranges = rdd.asInstanceOf[HasOffsetRanges]
  // To stuff with ranges
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-01
    • 2019-12-10
    • 1970-01-01
    • 2019-04-18
    • 2015-02-10
    • 2021-05-18
    • 2021-02-25
    • 1970-01-01
    相关资源
    最近更新 更多