【发布时间】:2018-08-01 04:21:07
【问题描述】:
我有从 Kafka 通过 DStream 到达的数据。我想进行特征提取以获得一些关键字。
我不想等待所有数据的到达(因为它旨在成为可能永远不会结束的连续流),所以我希望分块执行提取 - 准确性是否会受到影响对我来说并不重要有点。
到目前为止,我整理了类似的东西:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
但是,我收到了java.lang.IllegalStateException: Haven't seen any document yet. - 我并不感到惊讶,因为我只是尝试将东西拼凑在一起,而且我知道由于我不等待某些数据的到来,所以当我尝试时生成的模型可能是空的在数据上使用它。
解决这个问题的正确方法是什么?
【问题讨论】:
标签: scala apache-spark feature-extraction dstream