【问题标题】:Apache Spark: how to create a counterApache Spark:如何创建计数器
【发布时间】:2016-05-20 17:13:00
【问题描述】:

我正在使用 scala 迈出第一步。我使用 Cloudera VM。

我有一个用 ; 分隔的 csv。我想拆分每一行并创建一个带有顺序计数器的 val 贷款。我写了一张地图来完成这些任务。

我的代码在下面

scala> val loans: RDD[(VertexId, ComplaintNodeDate)] =
 |  sc.textFile("/home/cloudera/complaints_loan.csv").filter(!_.startsWith("DateReceived")).
 | map {line => 
 | val row = line split ';'
 | var initialValue1 = initialValue2 + 1L
 |  initialValue2 = initialValue1
 | (initialValue2, ComplaintLoan(row(0),row(1), row(2), row(3), row(4)))
 | }

我收到以下错误

 java.io.IOException: Failed to create local dir in /tmp/spark-3940587c-c7b4-460c-be02-02660ed17f05/blockmgr-d5286d12-401a-4d68-b8b9-7654d319800d/21.
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:73)
at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:167)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:404)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:805)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:637)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:991)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:98)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1051)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:761)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:589)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:62)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
at $iwC$$iwC$$iwC.<init>(<console>:78)
at $iwC$$iwC.<init>(<console>:80)
at $iwC.<init>(<console>:82)
at <init>(<console>:84)
at .<init>(<console>:88)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:874)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


 scala> 

我的代码对吗?

【问题讨论】:

    标签: scala dictionary apache-spark counter cloudera-cdh


    【解决方案1】:

    1) 您必须在 Cloudera 中代表用户 spark 运行 spark-shellspark-submit

    sudo -u spark spark-shell
    

    2) 默认情况下,Spark 从 HDFS 将数据读入 DataFrame,因此需要先上传(使用 hdfs 命令行工具或 Hue 文件浏览器)。如果要从驱动程序的本地文件系统读取数据,请指定显式协议file:///home/data/file.txt

    3) 在你的 RDD 上使用zipWithIndex 来获取行号:

    loans.zipWithIndex()
    

    【讨论】:

      【解决方案2】:

      不确定是否可以通过局部变量维护计数器,但如果您想将 id 附加到您的记录中。您可以单独创建一个,将其与您的loans 文件一起压缩,然后将其转换为您想要的东西。

      val numOfPartitions = 8
      val loans = sc.textFile("/home/cloudera/complaints_loan.csv", numOfPartitions).
                     filter(!_.startsWith("DateReceived"))
      
      val loansWithId = sc.parallelize(1 to loans.count.toInt, numOfPartitions).
                           zip(loans).map{ line =>
                           val row = line._2.split(';')
                           (line._1, ComplaintLoan(row(0),row(1), row(2), row(3), row(4)))
      

      希望对你有帮助!

      【讨论】:

      • 非常感谢。我检查了你的代码。但是当我创建 loanWithID org.apache.hadoop.mapred.InvalidInputException 时收到以下消息:输入路径不存在:hdfs://quickstart.cloudera:8020/home/cloudera/complaints_loan.csv 在你能帮我解决吗错误?。我使用 Cloudera V
      • 正如错误提示的那样,它没有找到投诉文件。在启动 spark-shell 或运行 spark-submit 之前尝试此命令“hadoop -fs -put localPathTo/complaints_loan.csv /home/cloudera”。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-10-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-12-21
      • 1970-01-01
      相关资源
      最近更新 更多