【问题标题】:Streaming K-means Spark Scala: Getting java.lang.NumberFormatException for input stringStreaming K-means Spark Scala:获取输入字符串的 java.lang.NumberFormatException
【发布时间】:2018-12-31 15:58:35
【问题描述】:

当我从包含双值的目录中读取 CSV 数据并在其上应用流式 K-means 模型时,如下所示,

//CSV 文件

40.729,-73.9422
40.7476,-73.9871
40.7424,-74.0044
40.751,-73.9869
40.7406,-73.9902
.....

//SBT 依赖:

name := "应用程序名称"

版本 := "0.1"

scalaVersion := "2.11.12"
val sparkVersion =“2.3.1”

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % "2.3.1")

//导入语句

导入 org.apache.spark.sql.{DataFrame, SparkSession}
导入 org.apache.spark.sql.streaming.OutputMode
导入 org.apache.spark.sql.types._
导入 org.apache.spark.{SparkConf, SparkContext, rdd}
导入 org.apache.spark.streaming.{Seconds, StreamingContext}
导入 org.apache.spark.mllib.clustering.{ KMeans,StreamingKMeans}
导入 org.apache.spark.mllib.linalg.Vectors

//读取Csv数据

val trainingData = ssc.textFileStream ("directory path") 
                      .map(x=>x.toDouble)
                      .map(x=>Vectors.dense(x))
// applying Streaming kmeans model
val model = new StreamingKMeans()
  .setK(numClusters)
  .setDecayFactor(1.0)
  .setRandomCenters(numDimensions, 0.0)
model.trainOn(trainingData)

我收到以下错误:

18/07/24 11:20:04 错误执行程序:阶段 2.0 中任务 0.0 中的异常 (TID 1) java.lang.NumberFormatException:对于输入字符串:“40.7473,-73.9857”在 sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) 在 sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 在 java.lang.Double.parseDouble(Double.java:538) 在 scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:285) 在 scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) 在 ubu$$anonfun$1.apply(uberclass.scala:305) 在 ubu$$anonfun$1.apply(uberclass.scala:305) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 线程中的异常 “streaming-job-executor-0”java.lang.Error: java.lang.InterruptedException 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

有人可以帮忙吗?

【问题讨论】:

  • 错误信息很清楚,字符串"40.7473,-73.9857"不是数字。它是用逗号分隔的两个数字。你需要split(",")它。
  • 感谢您的回复。@jwvh 但是使用以下拆分功能后:` var trainingData = ssc.textFileStream("目录路径").map(x=>x.split(',') .map(_.toDouble)).map(x=>Vectors.dense(x)) `它给出了这个错误:java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:212)
  • 我只是在猜测,但我很怀疑您是否将正确的类型传递给 Vectors.dense() 方法。你可以试试:.map(_.split(',').map(x=>Vectors.dense(x.toDouble)))
  • @jwvh 我按照你的建议做了。但它会导致类型为 DStream[Array[Vector]] 的 Dstream 向量不被 Streaming K-means 的 trainOn 方法所接受。它需要 DStream[Vector] 类型的流数据
  • 您在 predictOnValues 中得到正确的结果了吗?

标签: scala streaming spark-streaming k-means


【解决方案1】:

存在尺寸问题。传递给流式 K-means 模型的 vectornumDimensiondimension 应该相同。

【讨论】:

  • 数据的维度:setRandomCenters的第一个参数(应该和特征个数一样)
猜你喜欢
  • 2018-10-05
  • 2014-09-30
  • 1970-01-01
  • 2012-12-05
  • 2013-09-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多