【问题标题】:Spark Streaming: foreachRDD update my mongo RDDSpark Streaming:foreachRDD 更新我的 mongo RDD
【发布时间】:2016-04-21 06:09:04
【问题描述】:

我想在每次进入foreachRDD 时创建一个新的 mongodb RDD。但是我有序列化问题:

 mydstream  
   .foreachRDD(rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      // ssc is my StreamingContext
      val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) })

这会给我一个错误:

object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@31133b6e)

有什么想法吗?

【问题讨论】:

  • SparkContext 不可序列化,因此不能在任何转换或动作方法中使用,只能在驱动程序类中使用。
  • 在 foreachRDD 方法中将 list 转换为 rdd 有什么具体原因吗?

标签: mongodb apache-spark spark-streaming


【解决方案1】:

据我了解,如果你有一个“不可序列化”的对象,你必须添加它,你需要通过foreachPartition 传递它,这样你就可以在运行处理之前连接到每个节点上的数据库。

mydstream.foreachRDD(rdd => {
        rdd.foreachPartition{
          val mongoClient = MongoClient("localhost", 27017)
          val db = mongoClient(mongoDatabase)
          val coll = db(mongoCollection)
          // ssc is my StreamingContext
          val modelsRDDRaw = ssc.sparkContext.parallelize(coll.find().toList) }})

【讨论】:

  • 您可以尝试在 rdd.foreachPartition val ssc = StreamingContext.getOrCreate(checkpointdirectory, functionToCreateContext _) 之前的 foreachRDD 中创建您的 ssc
【解决方案2】:

您可以尝试使用返回 SparkContext 或 SparkStreamingContext(如果 rdd 是 DStream)的 rdd.context

mydstream foreachRDD { rdd => {
      val mongoClient = MongoClient("localhost", 27017)
      val db = mongoClient(mongoDatabase)
      val coll = db(mongoCollection)
      val modelsRDDRaw = rdd.context.parallelize(coll.find().toList) })

其实RDD好像也有.sparkContext的方法。老实说,我不知道有什么区别,也许它们是别名(?)。

【讨论】:

    猜你喜欢
    • 2023-03-17
    • 2014-07-23
    • 2022-07-05
    • 2021-12-14
    • 1970-01-01
    • 2018-08-09
    • 1970-01-01
    • 1970-01-01
    • 2016-01-20
    相关资源
    最近更新 更多