【问题标题】:Options to read large files (pure text, xml, json, csv) from hdfs in RStudio with SparkR 1.5使用 SparkR 1.5 从 RStudio 中的 hdfs 读取大文件(纯文本、xml、json、csv)的选项
【发布时间】:2015-12-11 16:30:06
【问题描述】:

我是 Spark 的新手,想知道除了以下选项之外,是否还有其他选项可以使用 SparkR 从 RStudio 读取存储在 hdfs 中的数据,或者我是否正确使用它们。数据可以是任何类型(纯文本、csv、json、xml 或任何包含关系表的数据库)和任何大小(1kb - 几个 gb)。

我知道 textFile(sc, path) 不应该再使用了,但是除了 read.df 函数,还有其他可能读取此类数据的方法吗?

以下代码使用 read.df 和 jsonFile 但 jsonFile 产生错误:

Sys.setenv(SPARK_HOME = "C:\\Users\\--\\Downloads\\spark-1.5.0-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
#load the Sparkr library
library(SparkR)

# Create a spark context and a SQL context
sc <- sparkR.init(master="local", sparkPackages="com.databricks:spark-csv_2.11:1.0.3")
sqlContext <- sparkRSQL.init(sc)

#create a sparkR DataFrame
df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/people.json", source = "json")
df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")

read.df 适用于 json,但我如何读取仅由新行分隔的日志消息等文本?例如

> df <- read.df(sqlContext, "hdfs://0.0.0.0:19000/README.txt", "text")
     Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
  java.lang.ClassNotFoundException: Failed to load class for data source: text.
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
    at org.apache.spark.sql.api.r.SQLUtils$.loadDF(SQLUtils.scala:156)
    at org.apache.spark.sql.api.r.SQLUtils.loadDF(SQLUtils.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
    at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
    at org.apache.spark.ap

jsonFile 的错误是:

> df <- jsonFile(sqlContext, "hdfs://0.0.0.0:19000/people.json")
    Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  java.io.IOException: No input paths specified in job
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:201)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfu

我不知道为什么 read.df 会抛出错误,因为我没有重新启动 SparkR 或调用 SparkR.stop()

对于相同的代码,除了使用 read.df 之外,我使用 SparkR:::textFile 函数和 sc 而不是 sqlContext(遵循 amplab 上的过时介绍)。

错误信息是:

data <- SparkR:::textFile(sc, "hdfs://0.0.0.0:19000/people.json")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : 
  java.lang.IllegalArgumentException: java.net.URISyntaxException: Expected scheme-specific part at index 5: hdfs:
    at org.apache.hadoop.fs.Path.initialize(Path.java:206)
    at org.apache.hadoop.fs.Path.<init>(Path.java:172)
    at org.apache.hadoop.fs.Path.<init>(Path.java:94)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:211)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at or

这个错误看起来是路径不正确,但我不知道为什么。

我目前使用的:

spark-1.5.0-bin-hadoop2.6 hadoop-2.6.0 视窗(8.1) R 版本 3.2.2 Rstudio 版本 0.99.484

我希望有人可以在这里给我一些关于这个问题的提示。

【问题讨论】:

    标签: r sparkr apache-spark-1.5


    【解决方案1】:

    试试

        % hadoop fs -put people.json /
        % sparkR
        > people <- read.df(sqlContext, "/people.json", "json")
        > head(people) 
    

    【讨论】:

      【解决方案2】:

      您可能需要一个用于解析其他文件的库,例如 DataBricks CSV 库:

      https://github.com/databricks/spark-csv

      然后您将在加载包的情况下启动 R,例如:

      $ sparkR --packages com.databricks:spark-csv_2.10:1.0.3

      然后像这样加载你的文件:

      &gt; df &lt;- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", inferSchema = "true")

      这假设您的 hdfs 主目录中有“cars.csv”测试文件。

      【讨论】:

      • 他在 spark init 中使用了这个 - 他说的是从 RStudio 而不是从命令行启动。
      猜你喜欢
      • 1970-01-01
      • 2015-11-27
      • 2016-02-04
      • 1970-01-01
      • 2013-06-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多