【发布时间】:2023-04-10 10:58:03
【问题描述】:
我有一个需求,我需要从 FTP 服务器下载某些 .Zip 文件,并将存档的内容(内容是一些 XML 文件)推送到 HDFS(Hadoop 分布式文件系统)。因此,到目前为止,我正在使用 aapache FTPClient 连接到 FTP 服务器并首先将文件下载到我的本地计算机。稍后将其解压缩并将文件夹路径提供给一个方法,该方法将迭代本地文件夹并将文件推送到 HDFS。为了便于理解,我还在下面附上了一些代码sn-ps。
//Gives me an active FTPClient
FTPClient ftpCilent = getActiveFTPConnection();
ftpCilent.changeWorkingDirectory(remoteDirectory);
FTPFile[] ftpFiles = ftpCilent.listFiles();
if(ftpFiles.length <= 0){
logger.info("Unable to find any files in given location!!");
return;
}
//Iterate files
for(FTPFile eachFTPFile : ftpFiles){
String ftpFileName = eachFTPFile.getName();
//Skips files if not .zip files
if(!ftpFileName.endsWith(".zip")){
continue;
}
System.out.println("Reading File -->" + ftpFileName);
/*
* location is the path on local system given by user
* usually loaded by a property file.
*
* Create a archiveLocation where archived files are
* downloaded from FTP.
*/
String archiveFileLocation = location + File.separator + ftpFileName;
String localDirName = ftpFileName.replaceAll(".zip", "");
/*
* localDirLocation is the location where a folder is created
* by the name of the archive in the FTP and the files are copied to
* respective folders.
*
*/
String localDirLocation = location + File.separator + localDirName;
File localDir = new File(localDirLocation);
localDir.mkdir();
File archiveFile = new File(archiveFileLocation);
FileOutputStream archiveFileOutputStream = new FileOutputStream(archiveFile);
ftpCilent.retrieveFile(ftpFileName, archiveFileOutputStream);
archiveFileOutputStream.close();
//Delete the archive file after coping it's contents
FileUtils.forceDeleteOnExit(archiveFile);
//Read the archive file from archiveFileLocation.
ZipFile zip = new ZipFile(archiveFileLocation);
Enumeration entries = zip.entries();
while(entries.hasMoreElements()){
ZipEntry entry = (ZipEntry)entries.nextElement();
if(entry.isDirectory()){
logger.info("Extracting directory " + entry.getName());
(new File(entry.getName())).mkdir();
continue;
}
logger.info("Extracting File: " + entry.getName());
IOUtils.copy(zip.getInputStream(entry), new FileOutputStream(
localDir.getAbsolutePath() + File.separator + entry.getName()));
}
zip.close();
/*
* Iterates the folder location provided and load the files to HDFS
*/
loadFilesToHDFS(localDirLocation);
}
disconnectFTP();
现在,这种方法的问题是,应用程序需要花费大量时间将文件下载到本地路径、解压缩然后将它们加载到 HDFS。有没有更好的方法可以让我即时从 FTP 中提取 Zip 的内容,并将内容流直接提供给方法 loadFilesToHDFS() 而不是本地系统的路径?
【问题讨论】:
-
ZipInputStream?另外,这是 2016 年,因此请避免使用File... 改用Files和Path -
嘿@fge,我本可以使用
ZipInputStream,但它需要InputStream作为输入,从FTPClient写入OutputStream。所以我在写FileOutputStream和从InputStream读取时面临时间滞后。对这种情况有什么建议吗? -
查看
FTPClient的文档...有一个方法可以满足你的需要:它叫做.retrieveFileStream()。 -
嘿@fge 非常感谢您的投入。效果很好。!!