【问题标题】:Python multiprocess/multithreading to speed up file copyingPython多进程/多线程加速文件复制
【发布时间】:2017-11-03 08:34:56
【问题描述】:

我有一个程序可以将大量文件从一个位置复制到另一个位置 - 我说的是 100,000 多个文件(我正在复制图像序列中的 314g)。他们都在极端的巨大,非常快速的网络存储上。我正在使用shutil按顺序复制文件,这需要一些时间,所以我试图找到优化它的最佳方法。我注意到一些我有效地使用多线程从网络读取文件的软件在加载时间上有很大的提高,所以我想尝试在 python 中执行此操作。

我没有编程多线程/多进程的经验 - 这似乎是正确的领域吗?如果是这样,最好的方法是什么?我查看了其他一些关于在 python 中进行线程文件复制的 SO 帖子,他们似乎都说你没有速度提升,但考虑到我的硬件,我认为情况并非如此。目前,我的 IO 上限还差得很远,资源仅占 1% 左右(我在本地有 40 个内核和 64g 的 RAM)。

【问题讨论】:

  • 你看过threading.Thread吗? docs.python.org/2/library/threading.html您可以创建多个线程,启动并加入它们,我不确定这是否会有所帮助,但这是我唯一能想到的。
  • 嘿胡安,我当然可以深入研究这个。我想我的问题更多的是是否值得教自己如何做到这一点,而最终它甚至可能不会更快。换句话说,有没有人有使用线程加速复制时间的经验?
  • 嗯,基于link,我认为多处理会比线程更好,因为“进程具有独立的 I/O 调度。”
  • repl.it/I2hT/0你可以试试这样的,不是太复杂,我没用过多处理,就是多线程。希望对您有所帮助。
  • @JohnMee 终于成功了!对我来说,最佳点是大约 16 个核心。我实际上看到 20 后速度有所下降。stackoverflow.com/questions/8584797/…

标签: python multithreading shutil


【解决方案1】:

这可以通过在 Python 中使用 gevent 来并行化。

我会推荐以下逻辑来实现加速 100k+ 文件复制

  1. 将需要复制的所有 100K+ 文件的名称放入 csv 文件中,例如:'input.csv'。

  2. 然后从该 csv 文件创建块。应根据机器中的处理器/内核数来确定块的数量。

  3. 将每个块传递给单独的线程。

  4. 每个线程按顺序读取该块中的文件名并将其从一个位置复制到另一个位置。

这里是python代码sn-p:

import sys
import os
import multiprocessing

from gevent import monkey
monkey.patch_all()

from gevent.pool import Pool

def _copyFile(file):
    # over here, you can put your own logic of copying a file from source to destination

def _worker(csv_file, chunk):
    f = open(csv_file)
    f.seek(chunk[0])
    for file in f.read(chunk[1]).splitlines():
        _copyFile(file)


def _getChunks(file, size):
    f = open(file)
    while 1:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline()
        yield start, f.tell() - start
        if not s:
            f.close()
            break

if __name__ == "__main__":
    if(len(sys.argv) > 1):
        csv_file_name = sys.argv[1]
    else:
        print "Please provide a csv file as an argument."
        sys.exit()

    no_of_procs = multiprocessing.cpu_count() * 4

    file_size = os.stat(csv_file_name).st_size

    file_size_per_chunk = file_size/no_of_procs

    pool = Pool(no_of_procs)

    for chunk in _getChunks(csv_file_name, file_size_per_chunk):
        pool.apply_async(_worker, (csv_file_name, chunk))

    pool.join()

将文件另存为 file_copier.py。 打开终端并运行:

$ ./file_copier.py input.csv

【讨论】:

  • 感谢您的详细回复!我将不得不花一些时间来解决这个问题,并确保我理解了一切。不过,在我有机会这样做之前,我忘了提到有时我只有几个文件要处理 - 所以它会从几个文件到 100k+ 不等。这样做会显着降低速度吗?我想我总是可以设置一个阈值,如果它超过 n 个文件,那么多线程它。
  • 不客气! :) 我希望我的解决方案对您有所帮助。是的,与顺序处理相比,您会发现速度显着降低。我觉得为不同的文件数量设置一个阈值可能是个好主意。关于代码sn-p,可以设置n = no_of_cores * 4。
【解决方案2】:

更新:

我从来没有让 Gevent 工作(第一个答案),因为我无法在没有互联网连接的情况下安装模块,而我的工作站上没有互联网连接。但是,仅使用 python 的内置线程(我已经学会了如何使用它),我就能够将文件复制时间减少 8 倍,我想将其发布为任何有兴趣的人的附加答案!下面是我的代码,可能需要注意的是,由于您的硬件/网络设置,我的 8 倍复制时间很可能因环境而异。

import Queue, threading, os, time
import shutil

fileQueue = Queue.Queue()
destPath = 'path/to/cop'

class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    lock = threading.Lock()

    def __init__(self):
        with open("filelist.txt", "r") as txt: #txt with a file per line
            fileList = txt.read().splitlines()

        if not os.path.exists(destPath):
            os.mkdir(destPath)

        self.totalFiles = len(fileList)

        print str(self.totalFiles) + " files to copy."
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath)
            fileQueue.task_done()
            with self.lock:
                self.copyCount += 1
                percent = (self.copyCount * 100) / self.totalFiles
                print str(percent) + " percent copied."

    def threadWorkerCopy(self, fileNameList):
        for i in range(16):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

【讨论】:

  • 此解决方案适用于 1 个文件。我如何能够循环多个带有路径的文件?我尝试循环遍历这段代码,但在某些时候,我会收到一个错误“无法启动新线程”
  • 嘿山姆,你是对的,这里缺少前面的步骤。请注意,有一个名为“fileQueue”的队列对象,在运行线程之前需要使用包含源文件和目标文件的元组来填充它。类似 fileQueue.put(("path/to/source/file.txt", "path/to/dest/file.txt")) 的东西。查看Queue 上的文档(非常简单)。
  • 非常感谢 Spencer 抽出时间回复我的评论。
  • @Spencer:我收到错误:OSError: [Errno 24] 打开的文件太多,你知道为什么吗?
  • 那么 16 是否意味着 16 个核心?
【解决方案3】:

在重新实现@Spencer 发布的代码时,我遇到了与帖子下方的 cmets 中提到的相同的错误(更具体地说:OSError: [Errno 24] Too many open files)。 我通过远离守护线程并改用concurrent.futures.ThreadPoolExecutor 解决了这个问题。这似乎以更好的方式处理要复制的文件的打开和关闭。通过这样做,除了现在看起来像这样的threadWorkerCopy(self, filename_list: List[str]) 方法之外,所有代码都保持不变:

    def threadWorkerCopy(self, filename_list: List[str]):
    """
    This function initializes the workers to enable the multi-threaded process. The workers are handles automatically with
    ThreadPoolExecutor. More infos about multi-threading can be found here: https://realpython.com/intro-to-python-threading/.
    A recurrent problem with the threading here was "OSError: [Errno 24] Too many open files". This was coming from the fact
    that deamon threads were not killed before the end of the script. Therefore, everything opened by them was never closed.

    Args:
        filename_list (List[str]): List containing the name of the files to copy.
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor:
        executor.submit(self.CopyWorker)

        for filename in filename_list:
            self.file_queue.put(filename)
        self.file_queue.join()  # program waits for this process to be done.

【讨论】:

    【解决方案4】:

    使用ThreadPool 怎么样?

    import os
    import glob
    import shutil
    from functools import partial
    from multiprocessing.pool import ThreadPool
    
    DST_DIR = '../path/to/new/dir'
    SRC_DIR = '../path/to/files/to/copy'
    
    # copy_to_mydir will copy any file you give it to DST_DIR
    copy_to_mydir = partial(shutil.copy, dst=DST_DIR))
    
    # list of files we want to copy
    to_copy = glob.glob(os.path.join(SRC_DIR, '*'))
    
    with ThreadPool(4) as p:
      p.map(copy_to_mydir, to_copy)
    
    

    【讨论】:

      【解决方案5】:

      如果您只想将目录树从一个路径复制到另一个路径,这是我的解决方案,它比以前的解决方案更简单。它利用multiprocessing.pool.ThreadPool 并为shutil.copytree 使用自定义复制功能:

      import shutil
      from multiprocessing.pool import ThreadPool
      
      
      class MultithreadedCopier:
          def __init__(self, max_threads):
              self.pool = ThreadPool(max_threads)
      
          def copy(self, source, dest):
              self.pool.apply_async(shutil.copy2, args=(source, dest))
      
          def __enter__(self):
              return self
      
          def __exit__(self, exc_type, exc_val, exc_tb):
              self.pool.close()
              self.pool.join()
      
      
      src_dir = "/path/to/src/dir"
      dest_dir = "/path/to/dest/dir"
      
      
      with MultithreadedCopier(max_threads=16) as copier:
          shutil.copytree(src_dir, dest_dir, copy_function=copier.copy)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2012-01-24
        • 1970-01-01
        • 2019-04-22
        • 1970-01-01
        • 2012-05-17
        • 1970-01-01
        相关资源
        最近更新 更多