【问题标题】:Python 3: How to write to the same file from multiple processes without messing it up?Python 3:如何从多个进程写入同一个文件而不弄乱它?
【发布时间】:2017-06-07 19:53:55
【问题描述】:

我有一个可以随时启动或停止的程序。该程序用于从网页下载数据。首先,用户将在.csv 文件中定义一堆网页,然后保存.csv 文件,然后启动程序。该程序将读取.csv 文件并将其转换为作业列表。接下来,作业被分成 5 个独立的 downloader 函数,这些函数并行工作,但下载时间可能不同。

downloader(共有 5 个)完成下载网页后,我需要它打开 .csv 文件并删除链接。这样,随着时间的推移,.csv 文件会越来越小。问题是有时两个download 函数会尝试同时更新.csv 文件并导致程序崩溃。我该如何处理?

【问题讨论】:

  • 这似乎是处理问题的一种特别困难的方法。这些工作是什么?也许这是处理问题的最佳方式。使用 csv 一次来创建作业(可能存储在数据库或单独的文件中)并管理这些作业。如果你想做 csv 的事情,它应该只由一个实体(可能是处理工作的主程序)管理,该实体从工作人员发送响应并更新 csv 文件。
  • 你没有。处理来自多个(平行)控制点的副作用是灾难的根源。如果这是使其工作的唯一方法,您可以按照您当前的一些答案所建议的那样实施锁,但是这里似乎并非如此。您应该实现经理/工人模式,让经理处理 IO,将工作交给工人,并从工人那里接收结果。

标签: python multithreading parallel-processing multiprocessing


【解决方案1】:

如果这是您的project from yesterday 的延续,则您的下载列表已经在内存中 - 只需在其进程完成下载时从加载列表中删除条目,然后仅在输入文件上写下整个列表退出“下载​​器”。没有理由不断写下更改。

如果您想知道(例如从外部进程)某个 url 何时被下载,即使您的“下载器”正在运行,请在每次进程返回下载成功时在 downloaded.dat 中写入一个新行。

当然,在这两种情况下,都从主进程/线程中写入,这样您就不必担心互斥锁。

更新 - 以下是使用与昨天相同的代码库的附加文件的方法:

def init_downloader(params):  # our downloader initializator
    downloader = Downloader(**params[0])  # instantiate our downloader
    downloader.run(params[1])  # run our downloader
    return params  # job finished, return the same params for identification

if __name__ == "__main__":  # important protection for cross-platform use

    downloader_params = [  # Downloaders will be initialized using these params
        {"port_number": 7751},
        {"port_number": 7851},
        {"port_number": 7951}
    ]
    downloader_cycle = cycle(downloader_params)  # use a cycle for round-robin distribution

    with open("downloaded_links.dat", "a+") as diff_file:  # open your diff file
        diff_file.seek(0)  # rewind the diff file to the beginning to capture all lines
        diff_links = {row.strip() for row in diff_file}  # load downloaded links into a set
        with open("input_links.dat", "r+") as input_file:  # open your input file
            available_links = []
            download_jobs = []  # store our downloader parameters + a link here
            # read our file line by line and filter out downloaded links
            for row in input_file:  # loop through our file
                link = row.strip()  # remove the extra whitespace to get the link
                if link not in diff_links:  # make sure link is not already downloaded
                    available_links.append(row)
                    download_jobs.append([next(downloader_cycle), link])
            input_file.seek(0)  # rewind our input file
            input_file.truncate()  # clear out the input file
            input_file.writelines(available_links)  # store back the available links
            diff_file.seek(0)  # rewind the diff file
            diff_file.truncate()  # blank out the diff file now that the input is updated
        # and now let's get to business...
        if download_jobs:
            download_pool = Pool(processes=5)  # make our pool use 5 processes
            # run asynchronously so we can capture results as soon as they ar available
            for response in download_pool.imap_unordered(init_downloader, download_jobs):
                # since it returns the same parameters, the second item is a link
                # add the link to our `diff` file so it doesn't get downloaded again
                diff_file.write(response[1] + "\n")
        else:
            print("Nothing left to download...")

正如我在评论中所写的那样,整个想法是在下载链接时使用文件来存储下载的链接,然后在下次运行时过滤掉下载的链接并更新输入文件。这样即使你强行杀死它,它也总是会从中断的地方继续(除了部分下载)。

【讨论】:

  • 是的。我需要不断更新文件,因为可能有大约 40,000 个工作要做,每个工作大约需要一分钟,而且我的计算机在这个程序中间被关闭(或程序被中断)的可能性很高。因此我认为我应该不断更新文件。
  • @user1367204 所以你让每个工作进程解析一个 40k 条目 csv,找到它应该删除的 one 条目,然后覆盖文件?这听起来像是一场恐怖表演。有许多更好的实现,其中之一是 zwer 在此处的第二段,关于保持 csv 完整但也填充“完成”列表。
  • @user1367204 - 然后使用“diff”文件 - 为您在该文件中下载的每个链接写一个新行,然后在下次启动下载器时,在加载“输入”文件后链接,加载这个'diff'文件,从'input'链接中删除'diff'链接,用这个过滤列表保存'input'文件并将'diff'文件清空。如果您需要更好地可视化,我稍后会编写一些伪代码...
  • @zwer,感谢您的帮助。我认为diff文件可能有同样的问题,即如果多个downloader函数尝试同时写入它可能是一个问题。
  • @zwer 谢谢,我研究了你在说什么,这是最有意义的。我将避免锁定,而是使用 worker-manager 设置。
【解决方案2】:

使用多处理库中的“锁”对文件进行序列化操作。

您需要将锁传递给每个进程。每个进程都应该在打开文件之前“获取”锁,并在关闭文件后“释放”锁。

https://docs.python.org/2/library/multiprocessing.html

【讨论】:

    【解决方案3】:

    查看python中的锁定文件。锁定文件将使下一个进程等到文件解锁才能修改它。锁定文件是特定于平台的,因此您必须使用适用于您所在操作系统的任何方法。如果您需要弄清楚操作系统,请使用这样的 switch 语句。

    import os
    
    def my_lock(f):
        if os.name == "posix":
            # Unix or OS X specific locking here
        elif os.name == "nt":
            # Windows specific locking here
        else:
            print "Unknown operating system, lock unavailable"
    

    然后我会查看this article 并弄清楚你想如何实现你的锁。

    【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-03-21
    • 2021-04-05
    • 2020-10-03
    • 1970-01-01
    • 1970-01-01
    • 2011-03-13
    • 2019-07-27
    相关资源
    最近更新 更多