【问题标题】:Reading HDFS and local files in Java用 Java 读取 HDFS 和本地文件
【发布时间】:2020-12-23 23:20:20
【问题描述】:

我想读取文件路径,无论它们是 HDFS 还是本地文件。目前,我通过前缀为 file:// 的本地路径和前缀为 hdfs:// 的 HDFS 路径并编写如下代码

Configuration configuration = new Configuration();
FileSystem fileSystem = null;
if (filePath.startsWith("hdfs://")) {
  fileSystem = FileSystem.get(configuration);
} else if (filePath.startsWith("file://")) {
  fileSystem = FileSystem.getLocal(configuration).getRawFileSystem();
}

从这里我使用 FileSystem 的 API 来读取文件。

请告诉我是否有比这更好的方法?

【问题讨论】:

  • 你为什么对你目前的方法不满意?
  • 我本身并不不开心。我希望我的方法接受 Path 的对象,并且我想知道 Path 是否有任何方法可以告诉我该路径是属于本地文件系统还是属于 HDFS 文件系统。我试图做一个路径的 toString 并进行上面的比较,但它没有用。我必须到路径上的 toURI().toString() 并执行此检查。
  • 我不确定是否需要为此创建一个新帖子。如果我应该有,我很抱歉。或者,我的问题是如果我有路径而不是字符串,我如何找到文件路径是 HDFS 还是本地的。会不会像我在第一篇文章中提到的那样做一个 toURI().toString() 并进行检查。或者做一个 toURI() 并检查方案是否。谢谢....

标签: java hadoop mapreduce hdfs


【解决方案1】:

这有意义吗,

public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/core-site.xml"));
    conf.addResource(new Path("/hadoop/projects/hadoop-1.0.4/conf/hdfs-site.xml"));

    BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
    System.out.println("Enter the file path...");
    String filePath = br.readLine();

    Path path = new Path(filePath);
    FileSystem fs = path.getFileSystem(conf);
    FSDataInputStream inputStream = fs.open(path);
    System.out.println(inputStream.available());
    fs.close();
}

如果你走这条路,你不必检查。直接从 Path 获取 FileSystem,然后随心所欲。

【讨论】:

  • 嗨 Tariq,源本地文件可以在边缘节点上吗?你也有映射器类的完整示例吗?
【解决方案2】:

您可以通过以下方式获取FileSystem

Configuration conf = new Configuration();
Path path = new Path(stringPath);
FileSystem fs = FileSystem.get(path.toUri(), conf);

你不需要判断路径是以hdfs://还是file://开头的。此 API 将完成这项工作。

【讨论】:

  • Tariq 的解决方案做同样的事情。 Path.getFileSystem 会调用这个 FileSystem.get(URI, Configuration) 方法
【解决方案3】:

请检查下面的代码 sn-p 列出来自 HDFS 路径的文件;即以hdfs:// 开头的路径字符串。如果您可以提供 Hadoop 配置和本地路径,它还将列出本地文件系统中的文件;即以file://开头的路径字符串。

    //helper method to get the list of files from the HDFS path
    public static List<String> listFilesFromHDFSPath(Configuration hadoopConfiguration, String hdfsPath,
                                                     boolean recursive)
    {
        //resulting list of files
        List<String> filePaths = new ArrayList<String>();
        FileSystem fs = null;

        //try-catch-finally all possible exceptions
        try
        {
            //get path from string and then the filesystem
            Path path = new Path(hdfsPath);  //throws IllegalArgumentException, all others will only throw IOException
            fs = path.getFileSystem(hadoopConfiguration);

            //resolve hdfsPath first to check whether the path exists => either a real directory or o real file
            //resolvePath() returns fully-qualified variant of the path
            path = fs.resolvePath(path);


            //if recursive approach is requested
            if (recursive)
            {
                //(heap issues with recursive approach) => using a queue
                Queue<Path> fileQueue = new LinkedList<Path>();

                //add the obtained path to the queue
                fileQueue.add(path);

                //while the fileQueue is not empty
                while (!fileQueue.isEmpty())
                {
                    //get the file path from queue
                    Path filePath = fileQueue.remove();

                    //filePath refers to a file
                    if (fs.isFile(filePath))
                    {
                        filePaths.add(filePath.toString());
                    }
                    else   //else filePath refers to a directory
                    {
                        //list paths in the directory and add to the queue
                        FileStatus[] fileStatuses = fs.listStatus(filePath);
                        for (FileStatus fileStatus : fileStatuses)
                        {
                            fileQueue.add(fileStatus.getPath());
                        } // for
                    } // else

                } // while

            } // if
            else        //non-recursive approach => no heap overhead
            {
                //if the given hdfsPath is actually directory
                if (fs.isDirectory(path))
                {
                    FileStatus[] fileStatuses = fs.listStatus(path);

                    //loop all file statuses
                    for (FileStatus fileStatus : fileStatuses)
                    {
                        //if the given status is a file, then update the resulting list
                        if (fileStatus.isFile())
                            filePaths.add(fileStatus.getPath().toString());
                    } // for
                } // if
                else        //it is a file then
                {
                    //return the one and only file path to the resulting list
                    filePaths.add(path.toString());
                } // else

            } // else

        } // try
        catch(Exception ex) //will catch all exception including IOException and IllegalArgumentException
        {
            ex.printStackTrace();

            //if some problem occurs return an empty array list
            return new ArrayList<String>();
        } //
        finally
        {
            //close filesystem; not more operations
            try
            {
                if(fs != null)
                    fs.close();
            } catch (IOException e)
            {
                e.printStackTrace();
            } // catch

        } // finally


        //return the resulting list; list can be empty if given path is an empty directory without files and sub-directories
        return filePaths;
    } // listFilesFromHDFSPath

如果您真的想使用 java.io.File API,那么以下方法将帮助您仅列出本地文件系统中的文件;即以file://开头的路径字符串。

    //helper method to list files from the local path in the local file system
    public static List<String> listFilesFromLocalPath(String localPathString, boolean recursive)
    {
        //resulting list of files
        List<String> localFilePaths = new ArrayList<String>();

        //get the Java file instance from local path string
        File localPath = new File(localPathString);


        //this case is possible if the given localPathString does not exit => which means neither file nor a directory
        if(!localPath.exists())
        {
            System.err.println("\n" + localPathString + " is neither a file nor a directory; please provide correct local path");

            //return with empty list
            return new ArrayList<String>();
        } // if


        //at this point localPath does exist in the file system => either as a directory or a file


        //if recursive approach is requested
        if (recursive)
        {
            //recursive approach => using a queue
            Queue<File> fileQueue = new LinkedList<File>();

            //add the file in obtained path to the queue
            fileQueue.add(localPath);

            //while the fileQueue is not empty
            while (!fileQueue.isEmpty())
            {
                //get the file from queue
                File file = fileQueue.remove();

                //file instance refers to a file
                if (file.isFile())
                {
                    //update the list with file absolute path
                    localFilePaths.add(file.getAbsolutePath());
                } // if
                else   //else file instance refers to a directory
                {
                    //list files in the directory and add to the queue
                    File[] listedFiles = file.listFiles();
                    for (File listedFile : listedFiles)
                    {
                        fileQueue.add(listedFile);
                    } // for
                } // else

            } // while
        } // if
        else        //non-recursive approach
        {
            //if the given localPathString is actually a directory
            if (localPath.isDirectory())
            {
                File[] listedFiles = localPath.listFiles();

                //loop all listed files
                for (File listedFile : listedFiles)
                {
                    //if the given listedFile is actually a file, then update the resulting list
                    if (listedFile.isFile())
                        localFilePaths.add(listedFile.getAbsolutePath());
                } // for
            } // if
            else        //it is a file then
            {
                //return the one and only file absolute path to the resulting list
                localFilePaths.add(localPath.getAbsolutePath());
            } // else
        } // else


        //return the resulting list; list can be empty if given path is an empty directory without files and sub-directories
        return localFilePaths;
    } // listFilesFromLocalPath

【讨论】:

    【解决方案4】:

    这项工作。

    package com.leerhdfs;
    
    //import org.apache.commons.io.IOUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.util.Progressable;
    
    import java.io.*;
    import java.net.URI;
    import java.nio.charset.StandardCharsets;
    
    public class ReadWriteHDFSExample {
    
    public static void main(String[] args) throws IOException {
    
        Path inFile = new Path(args[0]);
        String destinosrc = args[1];
        
        //InputStream in = new BufferedInputStream(new FileInputStream(localsrc));
        Configuration conf = new Configuration();
        
        
        FileSystem fs = FileSystem.get(URI.create(destinosrc), conf);
        FSDataInputStream in = fs.open(inFile);
       
        
        //Progressable ir viendo aumento 10%, 20%, 30%
        OutputStream out = fs.create(new Path(destinosrc), new Progressable() {
            
            public void progress() {
                System.out.println("Leyendo y escribiendo...");
                
            }
        });
        
        IOUtils.copyBytes(in ,out, 4096, true);
           in.close();
      }
    
    }
    

    【讨论】:

    • 虽然此代码可能会解决问题,但 including an explanation 关于如何以及为什么解决问题将真正有助于提高您的帖子质量,并可能导致更多的赞成票。请记住,您正在为将来的读者回答问题,而不仅仅是现在提出问题的人。请edit您的答案添加解释并说明适用的限制和假设。 From Review
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多