【问题标题】:Hashing (sha1) multiple file concurrently using threads使用线程同时散列(sha1)多个文件
【发布时间】:2014-09-06 22:24:14
【问题描述】:

我有 N 个大文件(不少于 250M)要散列。这些文件位于 P 个物理驱动器上。

我想用最大 K 个活动线程同时对它们进行哈希处理,但我不能对每个物理驱动器进行超过 M 个文件的哈希处理,因为它会减慢整个过程(我运行了一个测试,解析 61 个文件,并使用 8 个线程)比 1 个线程慢;文件几乎都在同一个磁盘上)。

我想知道最好的方法是什么:

  • 我可以使用 Executors.newFixedThreadPool(K)
  • 然后我会使用计数器提交任务以确定是否应该添加新任务。

我的代码是:

int K = 8;
int M = 1;
Queue<Path> queue = null; // get the files to hash
final ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(K);
final ConcurrentMap<FileStore, Integer> counter = new ConcurrentHashMap<>();
final ConcurrentMap<FileStore, Integer> maxCounter = new ConcurrentHashMap<>();
for (FileStore store : FileSystems.getDefault().getFileStores()) {
  counter.put(store, 0);
  maxCounter.put(store, M);
}
List<Future<Result>> result = new ArrayList<>();
while (!queue.isEmpty()) {
  final Path current = queue.poll();
  final FileStore store = Files.getFileStore(current);
  if (counter.get(store) < maxCounter.get(store)) {
    result.add(newFixedThreadPool.submit(new Callable<Result>() {

      @Override
      public Entry<Path, String> call() throws Exception {
        counter.put(store, counter.get(store) + 1);
        String hash = null; // Hash the file
        counter.put(store, counter.get(store) - 1);
        return new Result(path, hash);
      }

    }));
  } else queue.offer(current);
}

抛开潜在的非线程安全操作(比如我如何玩计数器),有没有更好的方法来实现我的目标?

我也认为这里的循环可能有点太多了,因为它可能会占用很多进程(几乎就像一个无限循环)。

【问题讨论】:

  • 您知道哪个文件在哪个驱动器上吗?我只需为每个驱动器分配一个线程,然后让它处理该驱动器上的所有文件。
  • 是的,每个驱动器一个线程可能是要走的路。
  • 是的。事实上,FileStore 的东西为您提供了逻辑驱动器(可能是也可能不是物理驱动器)。最终,我想在 NAS 上运行我的代码,该 NAS 具有 5 个驱动器的 RAID,这将允许对更多文件进行哈希处理。
  • 作为旁注使用do{int count = counter.get(store);}while(!counter.replace(store, count,count+1));而不是get/put(替换为concurrentmap的CompareAndSwap)
  • 我同意:每个驱动器一个可防止磁头移位。优先队列也会很好,以防止拥塞。基于队列时间和负文件大小的优先级:如处理的累积大小 - 队列条目的文件大小。

标签: java multithreading sha1 java-7


【解决方案1】:

如果在编译时不知道驱动器硬件配置,并且可能会更改/升级,则很有可能使用每个驱动器的线程池并使线程数可由用户配置。我对“newFixedThreadPool”不熟悉 - 线程数是可以在运行时更改以优化性能的属性吗?

【讨论】:

  • 我简化了我的示例,但我打算让用户(我自己^^)更改最大线程数(如果是超线程,则为核心数 * 2)和每个磁盘的最大线程数。
【解决方案2】:

经过很长时间,我找到了一个解决方案来满足我的需求:我使用了 ExecutorService 而不是整数计数器或 AtomicInteger 或其他任何东西,并且每个提交的任务都使用 Semaphore 在每个任务之间共享一个驱动器的文件。

喜欢:

ConcurrentMap<FileStore, Semaphore> map = new ConcurrentHashMap<>();
ExecutorService es = Executors.newFixedThreadPool(10);
for (Path path : listFile()) {
  final FileStore store = Files.getFileStore(path);
  final Semaphore semaphore = map.computeIfAbsent(store, key -> new Semaphore(getAllocatedCredits(store)));
  final int cost = computeCost(path);
  es.submit(() -> {
    semaphore.acquire(cost);
    try {
      ... some work ...
    } finally {
      semaphore.release(cost);
    }
  });
}


int getAllocatedCredits(FileStore store) {return 2;}
int computeCost(Path path) {return 1;}

注意 Java 8 的帮助,尤其是在 computeIfAbsentsubmit 中。

【讨论】:

    猜你喜欢
    • 2011-03-11
    • 2015-05-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-04-06
    • 2021-01-20
    • 1970-01-01
    相关资源
    最近更新 更多