【发布时间】:2018-12-31 15:58:35
【问题描述】:
当我从包含双值的目录中读取 CSV 数据并在其上应用流式 K-means 模型时,如下所示,
//CSV 文件
40.729,-73.9422
40.7476,-73.9871
40.7424,-74.0044
40.751,-73.9869
40.7406,-73.9902
.....
//SBT 依赖:
name := "应用程序名称"
版本 := "0.1"
scalaVersion := "2.11.12"
val sparkVersion =“2.3.1”libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % "2.3.1")
//导入语句
导入 org.apache.spark.sql.{DataFrame, SparkSession}
导入 org.apache.spark.sql.streaming.OutputMode
导入 org.apache.spark.sql.types._
导入 org.apache.spark.{SparkConf, SparkContext, rdd}
导入 org.apache.spark.streaming.{Seconds, StreamingContext}
导入 org.apache.spark.mllib.clustering.{ KMeans,StreamingKMeans}
导入 org.apache.spark.mllib.linalg.Vectors
//读取Csv数据
val trainingData = ssc.textFileStream ("directory path") .map(x=>x.toDouble) .map(x=>Vectors.dense(x)) // applying Streaming kmeans model val model = new StreamingKMeans() .setK(numClusters) .setDecayFactor(1.0) .setRandomCenters(numDimensions, 0.0) model.trainOn(trainingData)
我收到以下错误:
18/07/24 11:20:04 错误执行程序:阶段 2.0 中任务 0.0 中的异常 (TID 1) java.lang.NumberFormatException:对于输入字符串:“40.7473,-73.9857”在 sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) 在 sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 在 java.lang.Double.parseDouble(Double.java:538) 在 scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:285) 在 scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) 在 ubu$$anonfun$1.apply(uberclass.scala:305) 在 ubu$$anonfun$1.apply(uberclass.scala:305) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 scala.collection.Iterator$$anon$11.next(Iterator.scala:410) 在 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193) 在 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) 在 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) 在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748) 线程中的异常 “streaming-job-executor-0”java.lang.Error: java.lang.InterruptedException 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1155) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)
有人可以帮忙吗?
【问题讨论】:
-
错误信息很清楚,字符串
"40.7473,-73.9857"不是数字。它是用逗号分隔的两个数字。你需要split(",")它。 -
感谢您的回复。@jwvh 但是使用以下拆分功能后:` var trainingData = ssc.textFileStream("目录路径").map(x=>x.split(',') .map(_.toDouble)).map(x=>Vectors.dense(x)) `它给出了这个错误:
java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:212) -
我只是在猜测,但我很怀疑您是否将正确的类型传递给
Vectors.dense()方法。你可以试试:.map(_.split(',').map(x=>Vectors.dense(x.toDouble))) -
@jwvh 我按照你的建议做了。但它会导致类型为 DStream[Array[Vector]] 的 Dstream 向量不被 Streaming K-means 的 trainOn 方法所接受。它需要 DStream[Vector] 类型的流数据
-
您在 predictOnValues 中得到正确的结果了吗?
标签: scala streaming spark-streaming k-means