【问题标题】:reading a file from HDFS only after it is fully written and closed只有在完全写入并关闭文件后才从 HDFS 读取文件
【发布时间】:2013-12-10 09:19:07
【问题描述】:

我有两个进程正在运行。一个是将文件写入 HDFS,另一个是加载这些文件。

第一个进程(写入文件的进程)正在使用:

private void writeFileToHdfs(byte[] sourceStream, Path outFilePath) {
FSDataOutputStream out = null;
try {
    // create the file
    out = getFileSystem().create(outFilePath);
    out.write(sourceStream);
} catch (Exception e) {
    LOG.error("Error while trying to write a file to hdfs", e);
} finally {
    try {
    if (null != out)
        out.close();
    } catch (IOException e) {
    LOG.error("Could not close output stream to hdfs", e);
    }
}
}

第二个进程读取这些文件以进行进一步处理。 创建文件时,首先创建它,然后填充内容。此过程需要时间(几毫秒,但仍然如此),在此期间,第二个过程可能会在文件完全写入和关闭之前拾取文件。

请注意,HDFS 不会在名称节点中保留锁定信息 - 因此没有守护程序可以在访问文件之前检查文件是否已锁定。

我想知道解决此问题的最佳方法是什么。

以下是我的想法:

  1. 将文件完全写入并关闭后复制到新文件夹,然后进行第二个过程 将从这个新文件夹中读取。
  2. 根据某些命名约定重命名文件,一旦完全写入并关闭,然后是第二个进程 将根据此命名约定读取。

我有一种感觉,我正在尝试解决一个众所周知的问题,但我错过了一些东西。有解决此类问题的最佳做法吗?

【问题讨论】:

  • 旁注:如果您使用的是 Java 7,则不需要做所有最后的事情,只需尝试使用资源
  • 为什么不使用套接字在进程之间进行通信? P1 可以与 P2 通信,同时转储文件,这样如果它们不同时在线,P2 仍然可以稍后再接......
  • @RossDrew 我正在使用 java7,但对它很陌生。我去看看,谢谢。
  • @UmNyobe 我不确定我会得到你。我宁愿将流程解耦。

标签: java io hdfs java-io


【解决方案1】:

Apache commons 有一些东西。只需touch 文件,错误就会告诉您它是否已被锁定。

import org.apache.commons.io.*

boolean fileAvail = false;

try {
    FileUtils.touch(fileName); //throws IOException if being used
    fileAvail = true;
} catch (IOException e) {
    fileAvail = false;
}

(也)尝试资源

在 Java 7 中,您可以在任何实现 Closable 的东西上使用此功能,例如文件、套接字和数据库连接,一旦执行此操作结束 try 块的范围,它将自动关闭

 try (FSDataOutputStream out = getFileSystem().create(outFilePath))
 {
   //use out in here
 }
 //No finally required - catch is optional

...保存所有额外的代码

【讨论】:

  • 文件以读权限打开时是否触发异常?
  • 在你跳跃之前注意:不要使用异常来控制流量。
  • 关于 Try with Resources - Red 关于它,学到了一些新东西并更改了我的代码(谢谢!)。关于您的解决方案,我不确定消费者是否可以这样做,让我检查一下,稍后再详细说明。
  • @RossDrew 这对于标准文件系统来说是一个很好的答案,但我使用的是 Hadoop 的 HDFS,并且这个 HDFS 不会在名称节点中保留锁定信息 - 所以那里没有可以检查的守护进程如果文件在访问之前被锁定。 (更新了问题)我的猜测是你的代码永远不会抛出异常——不管文件是否被使用。
  • 那么我认为@raphw 解决方案可能是您最好的选择。
【解决方案2】:

您是在同一个 (JVM) 进程中谈论 two separate processes here or about two separate threads 吗?

两种方式,这是一个consumer-producer problem,而您缺少的是生产者和消费者之间的some proper synchronization。如果您在同一个 JVM 进程中运行两个线程,则可以使用 BlockingQueue 来将某种 file-transfer-finished token 从生产者传输到消费者,例如文件完全写入并关闭其流后的文件名。一旦在队列中找到文件名,消费者就可以确定该文件已完全写入并关闭,因为这已由生产者确认。

但是,如果您使用两个不同的进程,则问题会有点难以解决,具体取决于其他组件的语言和网络设置,但您必须实现某种可由两者使用的队列进程,例如通过本地网络端口发送一些信息,以便进程知道彼此的工作。

无论如何,我总是避免在文件系统上移动文件,因为与发送简单令牌相比,这是一项相当昂贵的操作。此外,移动 arround 文件可能会暴露尚未完全移动的文件,具体取决于您使用的语言。

【讨论】:

  • 确实,如果我们谈论的是 2 个线程,那是我熟悉的一个简单的生产者-消费者问题。但我说的是 2 个完全不同的进程,在不同的机器上运行。
  • 同样的问题:通过一个开放的端口发送一条消息,生产进程向消费进程确认文件 X 应该被处理。我会避免从文件系统中推断出这种状态。这样,您还可以在以后的某个状态添加新的消费者和生产者,并添加一些负载平衡。
  • 您正在排除问题!假设另一个应用程序是我们无权更改的第三方应用程序,那么呢?
【解决方案3】:

这里真的需要两个进程吗?为什么不创建两个线程然后加入它。

【讨论】:

  • 这不取决于我。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-03-31
  • 2013-08-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多