【问题标题】:Java: Watching a directory to move large filesJava:监视目录以移动大文件
【发布时间】:2011-03-23 02:44:21
【问题描述】:

我一直在编写一个监视目录的程序,当在其中创建文件时,它会更改名称并将它们移动到新目录。在我的第一个实现中,我使用了 Java 的 Watch Service API,它在我测试 1kb 文件时运行良好。出现的问题是,实际上创建的文件大小在 50-300mb 之间。发生这种情况时,观察者 API 会立即找到该文件,但无法移动它,因为它仍在写入中。我尝试将观察者置于一个循环中(在可以移动文件之前会产生异常),但这似乎效率很低。

由于这不起作用,我尝试使用一个计时器,该计时器每 10 秒检查一次文件夹,然后尽可能移动文件。这是我最终采用的方法。

问题:在没有进行异常检查或不断比较大小的情况下,是否会在文件写入完成时发出信号?我喜欢对每个文件只使用一次 Watcher API 的想法,而不是不断地使用计时器检查(并遇到异常)。

非常感谢所有回复!

nt

【问题讨论】:

  • I tried putting the watcher in a loop (which generated exceptions until the file could be moved) but this seemed pretty inefficient. 是的,这是一个糟糕的解决方案。管理控制流不例外。
  • 可悲的是@ntmp,从我到目前为止的测试来看,寻找异常是判断操作系统仍在“写入”或“复制”文件的最佳方式。但我同意@Sean Patrick Floyd 的观点,这是一种让它发挥作用的糟糕方法。我个人希望检查是 java.io.File API 的一部分。不知道为什么没有。将留给 JVM 人员来实现并使我们的开发人员更容易......
  • “检查异常”方法甚至不适用于 UNIX,因为 UNIX 文件系统不会锁定正在写入的文件。在 UNIX 上,java 会愉快地移动部分写入的文件,从而导致数据损坏。

标签: java directory watch


【解决方案1】:

写入另一个文件作为原始文件已完成的指示。 如果完成创建文件 'fileorg.done' 并检查,ig 'fileorg.dat' 正在增长 仅适用于“fileorg.done”。

使用巧妙的命名约定,您应该不会有任何问题。

【讨论】:

    【解决方案2】:

    两种解决方案:

    第一个是the answer by stacker的细微变化:

    对不完整的文件使用唯一的前缀。类似于myhugefile.zip.inc 而不是myhugefile.zip。上传/创建完成后重命名文件。从手表中排除 .inc 文件。

    第二种是使用同一驱动器上的不同文件夹来创建/上传/写入文件,并在准备好后将它们移动到监视文件夹。如果它们在同一个驱动器上,移动应该是一个原子操作(我猜是依赖于文件系统)。

    无论哪种方式,创建文件的客户端都必须做一些额外的工作。

    【讨论】:

    • 问题是我对创建文件的客户端几乎没有控制权。我无法添加唯一前缀。我也可以指定写入文件的文件夹,但我不能告诉客户端在完成写入后将它们移动到另一个文件夹。
    • @ntmp 你有没有解决这个问题,请分享给我,因为我也面临同样的问题
    【解决方案3】:

    我推测 java.io.File.canWrite() 会告诉你文件何时完成写入。

    【讨论】:

    • 我尝试了一个快速测试,一个线程写入文件,而另一个线程检查 canWrite() 方法,但它总是返回 true。
    • 实际上我相信它只是检查操作系统以查看您是否有写入权限。从安全的角度来看,您可能有权限,但从等待它完成写入的角度来看,您可能没有权限。
    【解决方案4】:

    这是一个非常有趣的讨论,当然这是一个生死攸关的用例:等待一个新文件被创建,然后以某种方式对该文件作出反应。这里的竞争条件很有趣,因为这里的高级要求当然是获取一个事件,然后实际获得(至少)文件的读锁。对于大文件或只是创建大量文件,这可能需要整个工作线程池,它们只是定期尝试锁定新创建的文件,并且当它们成功时,实际完成工作。但我确信 NT 意识到,必须谨慎地执行此操作以使其可扩展,因为它最终是一种轮询方法,而可扩展性和轮询并不是两个可以很好地结合在一起的词。

    【讨论】:

      【解决方案5】:

      我今天遇到了同样的问题。我的用例在实际导入文件之前有一点延迟并不是什么大问题,我仍然想使用 NIO2 API。我选择的解决方案是等到文件没有被修改 10 秒后再对其执行任何操作。

      实现的重要部分如下。程序一直等待,直到等待时间到期或发生新事件。每次修改文件时都会重置过期时间。如果一个文件在等待时间到期之前被删除,它将从列表中删除。我使用poll方法,超时时间为预期过期时间,即(lastmodified+waitTime)-currentTime

      private final Map<Path, Long> expirationTimes = newHashMap();
      private Long newFileWait = 10000L;
      
      public void run() {
          for(;;) {
              //Retrieves and removes next watch key, waiting if none are present.
              WatchKey k = watchService.take();
      
              for(;;) {
                  long currentTime = new DateTime().getMillis();
      
                  if(k!=null)
                      handleWatchEvents(k);
      
                  handleExpiredWaitTimes(currentTime);
      
                  // If there are no files left stop polling and block on .take()
                  if(expirationTimes.isEmpty())
                      break;
      
                  long minExpiration = min(expirationTimes.values());
                  long timeout = minExpiration-currentTime;
                  logger.debug("timeout: "+timeout);
                  k = watchService.poll(timeout, TimeUnit.MILLISECONDS);
              }
          }
      }
      
      private void handleExpiredWaitTimes(Long currentTime) {
          // Start import for files for which the expirationtime has passed
          for(Entry<Path, Long> entry : expirationTimes.entrySet()) {
              if(entry.getValue()<=currentTime) {
                  logger.debug("expired "+entry);
                  // do something with the file
                  expirationTimes.remove(entry.getKey());
              }
          }
      }
      
      private void handleWatchEvents(WatchKey k) {
          List<WatchEvent<?>> events = k.pollEvents();
          for (WatchEvent<?> event : events) {
              handleWatchEvent(event, keys.get(k));
          }
          // reset watch key to allow the key to be reported again by the watch service
          k.reset();
      }
      
      private void handleWatchEvent(WatchEvent<?> event, Path dir) throws IOException {
          Kind<?> kind = event.kind();
      
          WatchEvent<Path> ev = cast(event);
              Path name = ev.context();
              Path child = dir.resolve(name);
      
          if (kind == ENTRY_MODIFY || kind == ENTRY_CREATE) {
              // Update modified time
              FileTime lastModified = Attributes.readBasicFileAttributes(child, NOFOLLOW_LINKS).lastModifiedTime();
              expirationTimes.put(name, lastModified.toMillis()+newFileWait);
          }
      
          if (kind == ENTRY_DELETE) {
              expirationTimes.remove(child);
          }
      }
      

      【讨论】:

      • 此线程中的最佳答案 - 现在是 2013 年,他们已经在 J​​ava 中修复了这个问题,还是仍然需要使用这样的代码?
      • 方法 handleExpiredWaitTimes 在迭代时删除条目,所以应该使用迭代器。
      【解决方案6】:

      当我实现一个文件系统观察器来传输上传的文件时,我不得不处理类似的情况。我为解决这个问题而实施的解决方案包括以下内容:

      1-首先,维护一个未处理文件的Map(只要文件还在被复制,文件系统就会产生Modify_Event,如果flag为false,你可以忽略它们)。

      2- 在你的文件处理器中,你从列表中取出一个文件并检查它是否被文件系统锁定,如果是,你会得到一个异常,只需捕获这个异常并将你的线程置于等待状态(即 10 秒)然后重试,直到锁被释放。处理完文件后,您可以将标志更改为 true 或将其从地图中移除。

      如果在等待时间段内传输同一文件的多个版本,此解决方案将效率不高。

      干杯, 拉姆齐

      【讨论】:

        【解决方案7】:

        虽然在 SO 完成复制时不可能通过 Watcher Service API 通知,但所有选项似乎都在“变通”(包括这个!)。

        如上所述,

        1) 在 UNIX 上不能选择移动或复制;

        2) 如果你有写权限,File.canWrite 总是返回 true,即使文件仍在被复制;

        3) 等待超时或新事件发生是一种选择,但如果系统过载但复制未完成怎么办?如果超时值很大,程序会等待很长时间。

        4) 如果您只是在使用文件而不是创建文件,则无法选择写入另一个文件来“标记”复制完成。

        另一种方法是使用以下代码:

        boolean locked = true;
        
        while (locked) {
            RandomAccessFile raf = null;
            try {
                    raf = new RandomAccessFile(file, "r"); // it will throw FileNotFoundException. It's not needed to use 'rw' because if the file is delete while copying, 'w' option will create an empty file.
                    raf.seek(file.length()); // just to make sure everything was copied, goes to the last byte
                    locked = false;
                } catch (IOException e) {
                    locked = file.exists();
                    if (locked) {
                        System.out.println("File locked: '" + file.getAbsolutePath() + "'");
                        Thread.sleep(1000); // waits some time
                    } else { 
                        System.out.println("File was deleted while copying: '" + file.getAbsolutePath() + "'");
                    }
            } finally {
                    if (raf!=null) {
                        raf.close();    
                    }
                }
        }
        

        【讨论】:

          【解决方案8】:

          根据文件写入完成后您需要移动文件的紧迫程度,您还可以检查稳定的最后修改时间戳,并仅移动处于静止状态的文件。您需要它稳定的时间量可能取决于实现,但我认为具有最后修改时间戳且 15 秒未更改的东西应该足够稳定,可以移动。

          【讨论】:

            【解决方案9】:

            我知道这是一个老问题,但也许它可以帮助某人。

            我遇到了同样的问题,所以我做了以下事情:

            if (kind == ENTRY_CREATE) {
                        System.out.println("Creating file: " + child);
            
                        boolean isGrowing = false;
                        Long initialWeight = new Long(0);
                        Long finalWeight = new Long(0);
            
                        do {
                            initialWeight = child.toFile().length();
                            Thread.sleep(1000);
                            finalWeight = child.toFile().length();
                            isGrowing = initialWeight < finalWeight;
            
                        } while(isGrowing);
            
                        System.out.println("Finished creating file!");
            
                    }
            

            当文件被创建时,它会变得越来越大。所以我所做的就是比较相隔一秒的重量。应用程序将处于循环中,直到两个权重相同。

            【讨论】:

            • 不确定这是否适用于 Win7,因为在复制文件时,Win7 会在硬盘中分配所有必要的空间,然后用文件的字节“填充”它。
            【解决方案10】:

            看起来 Apache Camel 通过尝试重命名文件 (java.io.File.renameTo) 来处理文件未完成上传问题。如果重命名失败,没有读锁,但继续尝试。重命名成功后,他们将其重命名,然后继续进行预期的处理。

            请参阅下面的 operations.renameFile。以下是 Apache Camel 源代码的链接:GenericFileRenameExclusiveReadLockStrategy.javaFileUtil.java

            public boolean acquireExclusiveReadLock( ... ) throws Exception {
               LOG.trace("Waiting for exclusive read lock to file: {}", file);
            
               // the trick is to try to rename the file, if we can rename then we have exclusive read
               // since its a Generic file we cannot use java.nio to get a RW lock
               String newName = file.getFileName() + ".camelExclusiveReadLock";
            
               // make a copy as result and change its file name
               GenericFile<T> newFile = file.copyFrom(file);
               newFile.changeFileName(newName);
               StopWatch watch = new StopWatch();
            
               boolean exclusive = false;
               while (!exclusive) {
                    // timeout check
                    if (timeout > 0) {
                        long delta = watch.taken();
                        if (delta > timeout) {
                            CamelLogger.log(LOG, readLockLoggingLevel,
                                    "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
                            // we could not get the lock within the timeout period, so return false
                            return false;
                        }
                    }
            
                    exclusive = operations.renameFile(file.getAbsoluteFilePath(), newFile.getAbsoluteFilePath());
                    if (exclusive) {
                        LOG.trace("Acquired exclusive read lock to file: {}", file);
                        // rename it back so we can read it
                        operations.renameFile(newFile.getAbsoluteFilePath(), file.getAbsoluteFilePath());
                    } else {
                        boolean interrupted = sleep();
                        if (interrupted) {
                            // we were interrupted while sleeping, we are likely being shutdown so return false
                            return false;
                        }
                    }
               }
            
               return true;
            }
            

            【讨论】:

              【解决方案11】:

              对于 linux 中的大文件,文件以 .filepart 的扩展名复制。您只需要使用 commons api 检查扩展并注册 ENTRY_CREATE 事件。我用我的 .csv 文件(1GB)对此进行了测试并添加它工作

              public void run()
              {
                  try
                  {
                      WatchKey key = myWatcher.take();
                      while (key != null)
                      {
                          for (WatchEvent event : key.pollEvents())
                          {
                              if (FilenameUtils.isExtension(event.context().toString(), "filepart"))
                              {
                                  System.out.println("Inside the PartFile " + event.context().toString());
                              } else
                              {
                                  System.out.println("Full file Copied " + event.context().toString());
                                  //Do what ever you want to do with this files.
                              }
                          }
                          key.reset();
                          key = myWatcher.take();
                      }
                  } catch (InterruptedException e)
                  {
                      e.printStackTrace();
                  }
              }
              

              【讨论】:

                【解决方案12】:

                如果您无法控制写入过程,请记录所有ENTRY_CREATED 事件并观察是否存在模式

                在我的例子中,文件是通过 WebDav (Apache) 创建的,并且创建了许多临时文件,但同时也为同一文件触发了 两个 ENTRY_CREATED 事件。第二个ENTRY_CREATED 事件表示复制过程完成。

                这是我的示例ENTRY_CREATED 事件。打印绝对文件路径(您的日志可能会有所不同,具体取决于写入文件的应用程序):

                [info] application - /var/www/webdav/.davfs.tmp39dee1 was created
                [info] application - /var/www/webdav/document.docx was created
                [info] application - /var/www/webdav/.davfs.tmp054fe9 was created
                [info] application - /var/www/webdav/document.docx was created
                [info] application - /var/www/webdav/.DAV/__db.document.docx was created 
                

                如您所见,我为 document.docx 收到了两个 ENTRY_CREATED 事件。在第二个事件之后,我知道文件已完成。就我而言,临时文件显然被忽略了。

                【讨论】:

                  【解决方案13】:

                  所以,我遇到了同样的问题,并为我提供了以下解决方案。 较早的尝试失败 - 尝试监控每个文件的“lastModifiedTime”统计信息,但我注意到大文件的大小增长可能会暂停一段时间。(大小不会连续变化)

                  基本思路 - 为每个事件创建一个触发器文件(在临时目录中),其名称格式如下 -

                  OriginalFileName_lastModifiedTime_numberOfTries

                  这个文件是空的,所有的剧本都在名字里。原始文件只会在经过特定持续时间的间隔后才会考虑,而不会更改其“最后修改时间”统计信息。 (注意 - 因为它是一个文件统计,所以没有开销 -> O(1))

                  注意 - 此触发器文件由不同的服务处理(例如“FileTrigger”)。

                  优势 -

                  1. 不休眠或等待保持系统。
                  2. 减轻文件观察器的工作以监控其他事件

                  FileWatcher 的代码 -

                  val triggerFileName: String = triggerFileTempDir + orifinalFileName + "_" + Files.getLastModifiedTime(Paths.get(event.getFile.getName.getPath)).toMillis + "_0"
                  
                  // creates trigger file in temporary directory
                  val triggerFile: File = new File(triggerFileName)
                  val isCreated: Boolean = triggerFile.createNewFile()
                  
                  if (isCreated)
                      println("Trigger created: " + triggerFileName)
                  else
                      println("Error in creating trigger file: " + triggerFileName)
                  

                  FileTrigger 代码(间隔时间为 5 分钟的 cron 作业) -

                   val actualPath : String = "Original file directory here"
                   val tempPath : String = "Trigger file directory here"
                   val folder : File = new File(tempPath)    
                   val listOfFiles = folder.listFiles()
                  
                  for (i <- listOfFiles)
                  {
                  
                      // ActualFileName_LastModifiedTime_NumberOfTries
                      val triggerFileName: String = i.getName
                      val triggerFilePath: String = i.toString
                  
                      // extracting file info from trigger file name
                      val fileInfo: Array[String] = triggerFileName.split("_", 3)
                      // 0 -> Original file name, 1 -> last modified time, 2 -> number of tries
                  
                      val actualFileName: String = fileInfo(0)
                      val actualFilePath: String = actualPath + actualFileName
                      val modifiedTime: Long = fileInfo(1).toLong
                      val numberOfTries: Int = fileStats(2).toInt
                  
                      val currentModifiedTime: Long = Files.getLastModifiedTime(Paths.get(actualFilePath)).toMillis
                      val differenceInModifiedTimes: Long = currentModifiedTime - modifiedTime
                      // checks if file has been copied completely(4 intervals of 5 mins each with no modification)
                      if (differenceInModifiedTimes == 0 && numberOfTries == 3)
                      {
                          FileUtils.deleteQuietly(new File(triggerFilePath))
                          println("Trigger file deleted. Original file completed : " + actualFilePath)
                      }
                      else
                      {
                          var newTriggerFileName: String = null
                          if (differenceInModifiedTimes == 0)
                          {
                              // updates numberOfTries by 1
                              newTriggerFileName = actualFileName + "_" + modifiedTime + "_" + (numberOfTries + 1)
                          }
                          else
                          {
                              // updates modified timestamp and resets numberOfTries to 0
                              newTriggerFileName = actualFileName + "_" + currentModifiedTime + "_" + 0
                          }
                  
                          // renames trigger file
                          new File(triggerFilePath).renameTo(new File(tempPath + newTriggerFileName))
                          println("Trigger file renamed: " + triggerFileName + " -> " + newTriggerFileName)
                      }    
                  }
                  

                  【讨论】:

                    猜你喜欢
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 2023-03-08
                    • 2017-03-09
                    • 1970-01-01
                    • 1970-01-01
                    • 1970-01-01
                    • 2011-11-23
                    相关资源
                    最近更新 更多