【问题标题】:AWS EMR Spark - get CSV And use with SparkSql apiAWS EMR Spark - 获取 CSV 并与 SparkSql api 一起使用
【发布时间】:2017-11-22 19:58:56
【问题描述】:
//download file  csv
ByteArrayOutputStream downloadedFile = downloadFile();

//save file in temp folder csv   (
java.io.File tmpCsvFile = save(downloadedFile);

//reading
Dataset<Row> ds = session
        .read()
        .option("header", "true") 
        .csv(tmpCsvFile.getAbsolutePath())

tmpCsvFile保存在以下路径

/mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q31188872422

阅读异常

org.apache.spark.sql.AnalysisException:路径不存在: HDFS://ip-33-33-33-33.ec2.internal:8020的/ mnt /纱线/ usercache / hadoop的/应用程序缓存/ application_1511379756333_0001 / container_1511379756333_0001_02_000001 / TMP / 1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv; P>

我认为问题在于文件保存在本地,当我尝试通过 spark-sql api 读取时找不到文件。 我已经尝试过 sparkContext.addFile() 并且不起作用。

有什么解决办法吗?

谢谢

【问题讨论】:

  • 文件 /mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/.. 似乎是一个本地文件。您是否尝试使用 file:/// 选项阅读。试试这个 Dataset ds = session.read().option("header", "true").csv(file:///tmpCsvFile.getAbsolutePath())

标签: java apache-spark apache-spark-sql spark-dataframe emr


【解决方案1】:

Spark 支持大量文件系统,用于读写。

  • 本地/常规 (file://)
  • S3 (s3://)
  • HDFS (hdfs://)

如果没有指定 URI,作为标准行为,spark-sql 将使用 hdfs://driver_address:port/path。

添加file:///到路径的解决方案,只能在客户端模式下工作,在我的情况下(集群)不行。 当驱动程序创建用于读取文件的任务时,它将被传递给执行程序,以到达没有该文件的节点之一。

我们能做什么?在 Hadoop 上写一个文件。

   Configuration conf = new Configuration();
   ByteArrayOutputStream downloadedFile = downloadFile();
   //convert outputstream in inputstream
   InputStream is=Functions.FROM_BAOS_TO_IS.apply(fileOutputStream);
   String myfile="miofile.csv";
   //acquiring the filesystem
   FileSystem fs = FileSystem.get(URI.create(dest),conf);
   //openoutputstream to hadoop
   OutputStream outf = fs.create( new Path(dest));
   //write file 
   IOUtils.copyBytes(tmpIS, outf, 4096, true);
   //commit the read task
   Dataset<Row> ds = session
    .read()
    .option("header", "true") 
    .csv(myfile)

谢谢,欢迎任何更好的解决方案

【讨论】:

    猜你喜欢
    • 2022-10-21
    • 1970-01-01
    • 2016-08-31
    • 2018-04-26
    • 1970-01-01
    • 2019-06-07
    • 2023-04-04
    • 2021-12-22
    • 1970-01-01
    相关资源
    最近更新 更多