【问题标题】:Accessing Collection of DStreams访问 DStream 集合
【发布时间】:2017-12-07 21:34:23
【问题描述】:

我正在尝试访问在此问题的解决方案中获得的过滤 DStream 集合:Spark Streaming - Best way to Split Input Stream based on filter Param

我按如下方式创建集合:

val statuCodes = Set("200","500", "404")
    spanTagStream.cache()
    val statusCodeStreams = statuCodes.map(key => key -> spanTagStream.filter(x => x._3.get("http.status_code").getOrElse("").asInstanceOf[String].equals(key)))

我尝试通过以下方式访问statusCodeStreams

for(streamTuple <- statusCodeStreams){
      streamTuple._2.foreachRDD(rdd =>
  rdd.foreachPartition(
      partitionOfRecords =>
        {
            val props = new HashMap[String, Object]()
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers)
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringSerializer")
            val producer = new KafkaProducer[String,String](props)

            partitionOfRecords.foreach
            {
                 x=>{ 
                 /* Code Writing to Kafka using streamTuple._1 as the topic-String */
                 }
            }
      })
   )
}

执行此操作时,我收到以下错误: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects

如何访问 Streams 以以可序列化的方式写入 Kafka?

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-streaming


    【解决方案1】:

    正如异常所示,DStream 定义正在被闭包捕获。 一个简单的选择是声明这个DStream 瞬态:

    @transient val spamTagStream = //KafkaUtils.create...
    

    @transient 标记要从某个对象的对象图的 Java 序列化中删除的某些对象。这种情况的关键是在闭包中使用了一些在与DStream(本例中为statusCodeStreams)相同范围内声明的val。闭包中val 的实际引用是outer.statusCodeStreams,导致序列化过程将outer 的所有上下文“拉”到闭包中。使用@transient,我们将 DStream(以及 StreamingContext)声明标记为不可序列化,并且避免了序列化问题。根据代码结构(如果它在一个 main 函数中都是线性的(不好的做法,顺便说一句),则可能需要将 ALL DStream 声明 + StreamingContext 实例标记为 @transient

    如果初始过滤的唯一目的是将内容“路由”到单独的 Kafka 主题,则可能值得在 foreachRDD 中移动过滤。这将使程序结构更简单。

    spamTagStream.foreachRDD{ rdd => 
        rdd.cache()
        statuCodes.map{code =>
            val matchingCodes = rdd.filter(...)
            matchingCodes.foreachPartition{write to kafka}
        }
        rdd.unpersist(true)
    }
    

    【讨论】:

    • 使用@transient的优点/缺点是什么?当我进行转换直到最终到达spanTagStream 时,这是否必须在流初始化时进行?关于场景:这只是我已经拥有的一些可以立即利用 Collection 的代码片段。其他用例将按原样使用 Stream 或在一定时间内写入 RDD 以训练某些机器学习算法,或提供数据点以与训练模型进行比较。
    • @LST 为问题添加了对transient 的解释(评论太长了,我认为无论如何它对问题的未来参考都很有价值)-@transient 标记需要在每个 DStream 值分配。通过在单独的类/对象中构造代码并注意闭包的序列化范围,可以避免很多事情。
    • 您是否有资源(可能是您知道的 git 项目或类似的项目)以及如何最好地构建应用程序以避免上下文问题并且不需要将所有内容标记为临时的?我还处于起步阶段,主要是从 Spark 主页上的教程中学习并从那里开始的。所以我基本上还是在main方法中做所有事情。
    • @LST 我会建议你一本书,但会是公然的自我推销 :-) 一些帮助你入门的指南:单独初始化流上下文并将其作为参数传递。将函数与数据流分开:创建一个函数process(proc: RDD =&gt; Unit) 并将其用作foreachRDD{process},而不是一个巨大的foreachRDD{....}。这样做,您的依赖关系变得清晰,b/c 您将需要调用例如process(proc: RDD =&gt; Unit, filter:Set[String]) 传递密钥并避免关闭蠕变。
    • 关于结构化的好建议 - 我将拆分移至 RDD 并将应用程序结构化为更多方法调用。现在一切都可以在没有单个@transient 的情况下运行。我什至设法过滤 RDD,将它们分配给状态集,然后遍历该集以进行最终处理。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-08-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多