【问题标题】:Scala Spark Streaming files into a dataframeScala Spark 将文件流式传输到数据帧中
【发布时间】:2017-08-25 16:09:44
【问题描述】:

我正在尝试使用 Scala 中的 Apache Spark 从本地文件系统流式传输文件。

我在 Eclipse 中使用所有 spark 库(以及其他一些库)运行以下代码:

package com.addlesee.sparkstreaming

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Level
import Utilities._
import com.lucidworks.spark._
import com.lucidworks.spark.SparkApp._
import com.lucidworks.spark.SparkApp.StreamProcessor
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.SQLContext._
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.FileSystem._

object fileRead {
  def main(args: Array[String]) {

    val conf = new SparkConf().setMaster("local[*]").setAppName("fileRead")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val ssc = new StreamingContext(sc, Seconds(1))

    val options = Map("collection" -> "filetester", "zkhost" -> "localhost:9984", "gen_uniq_key" -> "true")
    val lines = ssc.textFileStream("file:///home/angus/data/data/wikipedia-cats/copy2")
    print("lines= " + lines)

      lines.foreachRDD((rdd, time) => {
        if(rdd.count() > 0){
          val rdd2 = rdd.repartition(1).cache()
          print("rdd= " + rdd)
          print("rdd2= " + rdd2)
          val df = rdd2.toDF()

          df.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
        }
    })


    ssc.start()
    ssc.awaitTermination()
  }
}

并收到以下信息:

2017-08-25 16:27:52,425 [main] WARN  NativeCodeLoader  - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-08-25 16:27:52,546 [main] WARN  Utils  - Your hostname, angus-pc resolves to a loopback address: 127.0.1.1; using 192.118.36.174 instead (on interface wlp3s0)
2017-08-25 16:27:52,546 [main] WARN  Utils  - Set SPARK_LOCAL_IP if you need to bind to another address
2017-08-25 16:27:53,029 [main] INFO  log  - Logging initialized @1580ms
2017-08-25 16:27:53,071 [main] INFO  Server  - jetty-9.3.z-SNAPSHOT
2017-08-25 16:27:53,084 [main] INFO  Server  - Started @1635ms
2017-08-25 16:27:53,095 [main] WARN  Utils  - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2017-08-25 16:27:53,101 [main] INFO  AbstractConnector  - Started ServerConnector@13c43d25{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
2017-08-25 16:27:53,126 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@71812481{/jobs,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,127 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@40238dd0{/jobs/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,128 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@79179359{/jobs/job,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,129 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@6a55299e{/jobs/job/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,130 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@4eb386df{/stages,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,131 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@3a0baae5{/stages/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,132 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@289710d9{/stages/stage,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,135 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@503ecb24{/stages/stage/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,136 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@6995bf68{/stages/pool,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,137 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@77825085{/stages/pool/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,138 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@71c27ee8{/storage,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,138 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@5b1ebf56{/storage/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,139 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@4b1d6571{/storage/rdd,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,140 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@3549bca9{/storage/rdd/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,144 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@6fb365ed{/environment,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,145 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@16414e40{/environment/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,146 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@525575{/executors,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,147 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@5a709816{/executors/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,148 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@6ad3381f{/executors/threadDump,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,148 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@db57326{/executors/threadDump/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,159 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@4748a0f9{/static,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,160 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@1941a8ff{/,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,165 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@149dd36b{/api,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,169 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@4f1bfe23{/jobs/job/kill,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,171 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@512535ff{/stages/stage/kill,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,459 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@56ace400{/metrics/json,null,AVAILABLE,@Spark}
lines= org.apache.spark.streaming.dstream.MappedDStream@74a9c4b02017-08-25 16:27:53,852 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@1e34c607{/streaming,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,853 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@36b6964d{/streaming/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,854 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@aa5455e{/streaming/batch,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,854 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@5dda14d0{/streaming/batch/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,855 [main] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@3b4ef7{/static/streaming,null,AVAILABLE,@Spark}
2017-08-25 16:28:46,307 [JobGenerator] INFO  FileInputFormat  - Total input paths to process : 1
rdd= MapPartitionsRDD[106] at textFileStream at file2solr.scala:28rdd2= MapPartitionsRDD[110] at repartition at file2solr.scala:332017-08-25 16:28:47,564 [streaming-job-executor-0] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@30698a7f{/SQL,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,565 [streaming-job-executor-0] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@2ac04ffb{/SQL/json,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,565 [streaming-job-executor-0] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@1b62dd54{/SQL/execution,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,566 [streaming-job-executor-0] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@5b4a1f33{/SQL/execution/json,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,567 [streaming-job-executor-0] INFO  ContextHandler  - Started o.s.j.s.ServletContextHandler@4049fe04{/static/sql,null,AVAILABLE,@Spark}
Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: com/typesafe/scalalogging/LazyLogging
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.getDeclaredConstructors0(Native Method)
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
    at java.lang.Class.getConstructor0(Class.java:3075)
    at java.lang.Class.newInstance(Class.java:412)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:470)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at com.addlesee.sparkstreaming.fileRead$$anonfun$main$1.apply(file2solr.scala:38)
    at com.addlesee.sparkstreaming.fileRead$$anonfun$main$1.apply(file2solr.scala:31)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.typesafe.scalalogging.LazyLogging
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 50 more

首先,当我直接在目录中创建一个新文件然后重命名它时,我只会收到我的 rdd 打印和错误...移动、重命名或编辑已经存在的文件不会输出任何内容。

我认为问题出在toDF() 函数或我的流文件方法。

我对 Scala 很陌生,所以一定是误解了一些东西。如果有帮助,我已经打印了带有标签的行、rdd 和 rdd2。

我尝试了其他方法,但没有成功在 spark-shell 中运行它。

感谢您的帮助!

【问题讨论】:

  • Eclipse 你可以使用ctrl+shift+t(输入com.typesafe.scalalogging.LazyLogging)它会弹出一个窗口并输入你的类名。它将向您显示类名所属的 jar。如果它不存在将其添加到 Eclipse 类路径并再次运行。
  • 你是对的,谢谢拉姆!
  • 我也发现了我的另一个问题。当我设置流时,Spark 正在寻找创建日期晚于流开始的文档。如果我移动或编辑该目录中的旧文件,则由于它们的创建日期旧,因此不会发生任何事情。

标签: scala apache-spark spark-dataframe spark-streaming


【解决方案1】:

尝试在你的类路径中包含 jar scalalogging-log4j 用于延迟日志记录。

在 pom.xml 中使用 Maven:

<dependency>
    <groupId>com.typesafe</groupId>
    <artifactId>scalalogging-log4j_2.10</artifactId>
    <version>1.1.0</version>
</dependency>

在 sbt 文件中使用 SBT:

libraryDependencies += "com.typesafe" % "scalalogging-log4j_2.10" % "1.1.0"

如果您没有使用任何依赖管理工具,请将其添加到 Eclipse 中的 lib 文件夹中。

【讨论】:

  • 谢谢!我从their GitHub 构建了包含惰性日志记录的 jar,并手动添加了它。在对其他依赖项重复几次之后,它就可以工作了。我仍然必须在目录中创建和重命名我的文档,以便它采取任何奇怪的操作......
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-03
  • 2014-08-07
  • 1970-01-01
  • 2015-07-24
  • 1970-01-01
相关资源
最近更新 更多