【发布时间】:2017-08-25 16:09:44
【问题描述】:
我正在尝试使用 Scala 中的 Apache Spark 从本地文件系统流式传输文件。
我在 Eclipse 中使用所有 spark 库(以及其他一些库)运行以下代码:
package com.addlesee.sparkstreaming
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Level
import Utilities._
import com.lucidworks.spark._
import com.lucidworks.spark.SparkApp._
import com.lucidworks.spark.SparkApp.StreamProcessor
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.SQLContext._
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.FileSystem._
object fileRead {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("fileRead")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
val options = Map("collection" -> "filetester", "zkhost" -> "localhost:9984", "gen_uniq_key" -> "true")
val lines = ssc.textFileStream("file:///home/angus/data/data/wikipedia-cats/copy2")
print("lines= " + lines)
lines.foreachRDD((rdd, time) => {
if(rdd.count() > 0){
val rdd2 = rdd.repartition(1).cache()
print("rdd= " + rdd)
print("rdd2= " + rdd2)
val df = rdd2.toDF()
df.write.format("solr").options(options).mode(org.apache.spark.sql.SaveMode.Overwrite).save()
}
})
ssc.start()
ssc.awaitTermination()
}
}
并收到以下信息:
2017-08-25 16:27:52,425 [main] WARN NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2017-08-25 16:27:52,546 [main] WARN Utils - Your hostname, angus-pc resolves to a loopback address: 127.0.1.1; using 192.118.36.174 instead (on interface wlp3s0)
2017-08-25 16:27:52,546 [main] WARN Utils - Set SPARK_LOCAL_IP if you need to bind to another address
2017-08-25 16:27:53,029 [main] INFO log - Logging initialized @1580ms
2017-08-25 16:27:53,071 [main] INFO Server - jetty-9.3.z-SNAPSHOT
2017-08-25 16:27:53,084 [main] INFO Server - Started @1635ms
2017-08-25 16:27:53,095 [main] WARN Utils - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
2017-08-25 16:27:53,101 [main] INFO AbstractConnector - Started ServerConnector@13c43d25{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
2017-08-25 16:27:53,126 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@71812481{/jobs,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,127 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@40238dd0{/jobs/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,128 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@79179359{/jobs/job,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,129 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@6a55299e{/jobs/job/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,130 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@4eb386df{/stages,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,131 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@3a0baae5{/stages/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,132 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@289710d9{/stages/stage,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,135 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@503ecb24{/stages/stage/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,136 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@6995bf68{/stages/pool,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,137 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@77825085{/stages/pool/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,138 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@71c27ee8{/storage,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,138 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@5b1ebf56{/storage/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,139 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@4b1d6571{/storage/rdd,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,140 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@3549bca9{/storage/rdd/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,144 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@6fb365ed{/environment,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,145 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@16414e40{/environment/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,146 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@525575{/executors,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,147 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@5a709816{/executors/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,148 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@6ad3381f{/executors/threadDump,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,148 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@db57326{/executors/threadDump/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,159 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@4748a0f9{/static,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,160 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@1941a8ff{/,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,165 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@149dd36b{/api,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,169 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@4f1bfe23{/jobs/job/kill,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,171 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@512535ff{/stages/stage/kill,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,459 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@56ace400{/metrics/json,null,AVAILABLE,@Spark}
lines= org.apache.spark.streaming.dstream.MappedDStream@74a9c4b02017-08-25 16:27:53,852 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@1e34c607{/streaming,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,853 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@36b6964d{/streaming/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,854 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@aa5455e{/streaming/batch,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,854 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@5dda14d0{/streaming/batch/json,null,AVAILABLE,@Spark}
2017-08-25 16:27:53,855 [main] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@3b4ef7{/static/streaming,null,AVAILABLE,@Spark}
2017-08-25 16:28:46,307 [JobGenerator] INFO FileInputFormat - Total input paths to process : 1
rdd= MapPartitionsRDD[106] at textFileStream at file2solr.scala:28rdd2= MapPartitionsRDD[110] at repartition at file2solr.scala:332017-08-25 16:28:47,564 [streaming-job-executor-0] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@30698a7f{/SQL,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,565 [streaming-job-executor-0] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@2ac04ffb{/SQL/json,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,565 [streaming-job-executor-0] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@1b62dd54{/SQL/execution,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,566 [streaming-job-executor-0] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@5b4a1f33{/SQL/execution/json,null,AVAILABLE,@Spark}
2017-08-25 16:28:47,567 [streaming-job-executor-0] INFO ContextHandler - Started o.s.j.s.ServletContextHandler@4049fe04{/static/sql,null,AVAILABLE,@Spark}
Exception in thread "streaming-job-executor-0" java.lang.NoClassDefFoundError: com/typesafe/scalalogging/LazyLogging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:470)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at com.addlesee.sparkstreaming.fileRead$$anonfun$main$1.apply(file2solr.scala:38)
at com.addlesee.sparkstreaming.fileRead$$anonfun$main$1.apply(file2solr.scala:31)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.typesafe.scalalogging.LazyLogging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 50 more
首先,当我直接在目录中创建一个新文件然后重命名它时,我只会收到我的 rdd 打印和错误...移动、重命名或编辑已经存在的文件不会输出任何内容。
我认为问题出在toDF() 函数或我的流文件方法。
我对 Scala 很陌生,所以一定是误解了一些东西。如果有帮助,我已经打印了带有标签的行、rdd 和 rdd2。
我尝试了其他方法,但没有成功在 spark-shell 中运行它。
感谢您的帮助!
【问题讨论】:
-
Eclipse 你可以使用
ctrl+shift+t(输入com.typesafe.scalalogging.LazyLogging)它会弹出一个窗口并输入你的类名。它将向您显示类名所属的 jar。如果它不存在将其添加到 Eclipse 类路径并再次运行。 -
你是对的,谢谢拉姆!
-
我也发现了我的另一个问题。当我设置流时,Spark 正在寻找创建日期晚于流开始的文档。如果我移动或编辑该目录中的旧文件,则由于它们的创建日期旧,因此不会发生任何事情。
标签: scala apache-spark spark-dataframe spark-streaming