【问题标题】:iterate over files in pyspark from hdfs directory从 hdfs 目录遍历 pyspark 中的文件
【发布时间】:2019-09-10 22:13:32
【问题描述】:

我有 hdfs 目录中的文件列表,我想从 hdfs 目录迭代 pyspark 中的文件并将每个文件存储在一个变量中并使用该变量进行进一步处理。我在下面遇到错误..

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonUtils.toSeq. Trace: 
py4j.Py4JException: Method toSeq([class org.apache.hadoop.fs.Path]) does not exist

InputDir = "/Data/Ready/ARRAY_COUNTERS"#输入hdfs目录

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path(InputDir)

for f in fs.get(conf).listStatus(path):
    Filename =  f.getPath()

df = spark.read.csv(Filename,header=True)
#I am getting above error in while reading this file.

【问题讨论】:

  • 你可以尝试使用 as Filename = str(f.getPath())

标签: pyspark


【解决方案1】:

关于这 2 行:

    Filename =  f.getPath()

df = spark.read.csv(Filename,header=True)

getPath() 不是字符串。此外 - f 也可以是目录,因此为确保您没有尝试加载目录,您可以在 f.isFile() 上添加验证:

if(f.isFile()):
    Filename =  f.getPath()
    df = spark.read.csv(str(Filename),header=True)

现在对我有用的替代方法是:

if(f.isFile()):
    Filename =  f.getPath()
    df = sc.textFile(str(Filename), 500).map(lambda x: x.split(", ")) #or any other spearator, returns RDD
    headers=df.first() # to infer schema - you can then convert it to pyspark dataframe with specific column types

【讨论】:

  • 当我使用 str(Filename) 时出现此错误。实际上我之前使用过这个 str 并没有解决。pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from an empty string'
  • 第二个选项也不起作用..输入路径不存在
  • with str(Filename) getting error :pyspark.sql.utils.IllegalArgumentException: u'Can not create a Path from an empty string'
  • 在开始处理文件名之前尝试添加if(f.isFile()):。我认为您只是在处理初始目录,这就是该错误的来源。 (参考:hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/…
  • 别担心,@vikrantrana - 我添加了你提到的行
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-12-16
  • 2017-05-20
  • 1970-01-01
  • 2015-03-05
  • 2011-06-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多