【问题标题】:Spark structured streaming 2.2 and k-meansSpark 结构化流 2.2 和 k-means
【发布时间】:2018-08-18 11:15:26
【问题描述】:

我正在从存储在 HDFS 上的文件夹中读取流式数据。我有以下一小段代码:

// Convert text into a DataSet of LogEntry rows. Select the two columns we care about
  val df = rawData.flatMap(parseLog).select("ip", "status")
  df .isStreaming


  val kmeans = new KMeans().setK(2).setSeed(1L)
  val model = kmeans.fit(df)

  // Evaluate clustering by computing Within Set Sum of Squared Errors.
  val WSSSE = model.computeCost(df)
  println(s"Within Set Sum of Squared Errors = $WSSSE")

  // Shows the K-means result
  println("Cluster Centers: ")
  model.clusterCenters.foreach(println)

当我运行上面的时候,我得到以下错误:

java.lang.IllegalArgumentException: Field "features" does not exist.
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
  at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:266)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.types.StructType.apply(StructType.scala:265)
  at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:40)
  at org.apache.spark.ml.clustering.KMeansParams$class.validateAndTransformSchema(KMeans.scala:93)
  at org.apache.spark.ml.clustering.KMeans.validateAndTransformSchema(KMeans.scala:254)
  at org.apache.spark.ml.clustering.KMeans.transformSchema(KMeans.scala:340)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:305)
  at StructuredStreaming$.main(<console>:189)
  ... 90 elided

我完全被这件事难住了

。任何帮助将不胜感激。

更新

我对 EmiCareOfCell44 答案做了以下修改:

导入 org.apache.spark.ml.feature.VectorAssembler 导入 org.apache.spark.ml.linalg.Vectors

val assembler = new VectorAssembler().setInputCols(Array("ip", "status")).setOutputCol("features")
val output = assembler.transform(df).select("features")
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(output)

代码现在加载,但是当我去运行它时,我收到以下错误:

java.lang.IllegalArgumentException: Data type StringType is not supported.
  at org.apache.spark.ml.feature.VectorAssembler$$anonfun$transformSchema$1.apply(VectorAssembler.scala:121)
  at org.apache.spark.ml.feature.VectorAssembler$$anonfun$transformSchema$1.apply(VectorAssembler.scala:117)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at org.apache.spark.ml.feature.VectorAssembler.transformSchema(VectorAssembler.scala:117)
  at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)
  at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler.scala:54)
  at StructuredStreaming$.main(<console>:129)
  ... 60 elided

我认为它越来越近了,只是需要调整一下。

【问题讨论】:

    标签: scala apache-spark k-means apache-spark-mllib


    【解决方案1】:

    你必须先使用 VectorAssembler 来创建特征向量。比如:

     val assembler = new VectorAssembler().setInputCols(Array("ip", "status")).setOutputCol("features")
      val df2 = assembler.transform(df).select("features")
      val kmeans = new KMeans().setK(2).setSeed(1L)
      val model = kmeans.fit(df2)
    

    【讨论】:

    • 我用以下导入添加了很长的代码: import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.ml.linalg.Vectors 我现在收到以下错误加载 scala 脚本时::317: error: type mismatch;找到:org.apache.spark.ml.feature.VectorAssembler 需要:org.apache.spark.sql.Dataset[_] val model = kmeans.fit(assembler)
    • 查看我的其他答案。仍然得到 java.lang.IllegalArgumentException: Data type StringType is not supported。在 org.apache.spark.ml.feature.VectorAssembler$$anonfun$transformSchema$1.apply(VectorAssembler.scala:121) 错误。我非常感谢您提供的帮助。我觉得这越来越近了。
    • @user22 请不要将错误作为答案。改为编辑原始问题
    • @cricket_007 已修复。我不知道正确的程序。
    • @cricket_007 “您需要将列转换为数字特征。”我怎么做? IP 地址和状态码已经是数字了。
    猜你喜欢
    • 2018-05-27
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-09
    • 2016-12-16
    • 2018-03-31
    • 2017-03-06
    相关资源
    最近更新 更多