【问题标题】:How to use feature extraction with DStream in Apache Spark如何在 Apache Spark 中使用 DStream 进行特征提取
【发布时间】:2018-08-01 04:21:07
【问题描述】:

我有从 Kafka 通过 DStream 到达的数据。我想进行特征提取以获得一些关键字。

我不想等待所有数据的到达(因为它旨在成为可能永远不会结束的连续流),所以我希望分块执行提取 - 准确性是否会受到影响对我来说并不重要有点。

到目前为止,我整理了类似的东西:

def extractKeywords(stream: DStream[Data]): Unit = {

  val spark: SparkSession = SparkSession.builder.getOrCreate

  val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData

  val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _

  val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData

  streamWithFeatures.print()
}

def extractFeatures(spark: SparkSession)
                   (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {

  val df = spark.createDataFrame(rdd).toDF("data", "words")

  val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
  val rawFeatures = hashingTF.transform(df)

  val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
  val idfModel = idf.fit(rawFeatures)

  val rescaledData = idfModel.transform(rawFeature)

  import spark.implicits._
  rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}

但是,我收到了java.lang.IllegalStateException: Haven't seen any document yet. - 我并不感到惊讶,因为我只是尝试将东西拼凑在一起,而且我知道由于我不等待某些数据的到来,所以当我尝试时生成的模型可能是空的在数据上使用它。

解决这个问题的正确方法是什么?

【问题讨论】:

    标签: scala apache-spark feature-extraction dstream


    【解决方案1】:

    我使用了 cmets 的建议,并将程序分成 2 次运行:

    • 计算IDF模型并将其保存到文件

      def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = {
        val session: SparkSession = SparkSession.builder.getOrCreate
      
        val wordsDf = session.createDataFrame(rdd).toDF("data", "words")
      
        val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
        val featurizedDf = hashingTF.transform(wordsDf)
      
        val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
        val idfModel = idf.fit(featurizedDf)
      
        idfModel.write.save(idfModelFile.getAbsolutePath)
      }
      
    • 从文件中读取 IDF 模型并简单地在所有传入信息上运行它

      val idfModel = IDFModel.load(idfModelFile.getAbsolutePath)
      
      val documentDf = spark.createDataFrame(rdd).toDF("update", "document")
      
      val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words")
      val wordsDf = tokenizer.transform(documentDf)
      
      val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures")
      val featurizedDf = hashingTF.transform(wordsDf)
      
      val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features")
      val featuresDf = extractor.transform(featurizedDf)
      
      featuresDf.select("update", "features")
      

    【讨论】:

      猜你喜欢
      • 2018-07-18
      • 2019-06-07
      • 2020-08-06
      • 2012-10-19
      • 2015-10-30
      • 2017-07-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多