【问题标题】:Writing files concurrently with other cpu-bound tasks with multiprocessing or ray使用 multiprocessing 或 ray 与其他 cpu-bound 任务同时写入文件
【发布时间】:2020-05-28 17:04:08
【问题描述】:

我有一个 72 核的工作站(实际上是 36 个多线程 CPU,multiprocessing.cpu_count() 显示为 72 核)。

我尝试了multiprocessingray 进行并发处理,批量处理数百万个小文件,我想在处理期间同时编写一些输出文件。

我对与例如关联的 .get() 方法的阻塞感到困惑。 apply_async()(在multiprocessing)和ray.get()

使用ray,我有一个远程函数(process_group()),它可以在一个循环中并行处理数据组。在下文中,使用multiprocessing 模块的代码版本也以 cmets 形式给出。

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :
   very_big_df.to_parquet(outputfile)

因此,在每次循环迭代中,我必须收集同时计算的多个 process_group() 的输出,作为数据帧列表 df_list 连接成一个更大的 very_big_df数据框。后者需要写入磁盘(通常大小为 ~1 到 ~3 GB)。编写一个这样的文件大约需要 10-30 [s] 而处理 process_group 遥控器需要大约 180 [s]。有数千次循环迭代。所以这需要几天时间才能完成。

是否可以将文件以非阻塞方式写入磁盘,同时循环继续以节省大约 10% 的时间(这将节省大约一天的计算时间)?

到下一次循环迭代的并发进程完成时,有足够的时间来写入上一次迭代的输出。 这里涉及的内核似乎都以接近 100% 的速度运行,因此可能也不推荐使用 Threading 模块。 multiprocessing.apply_async() 更令人沮丧,因为它不想要我的不可选择的输出 very_big_df 数据框,我必须与一些更复杂的东西分享,这可能会花费我试图节省的时间我希望ray 能有效地处理类似的事情。

[更新] 为了简单起见,我没有提到所有进程之间有一个很大的共享变量(这就是为什么我称它为并行进程,以及文件的并发写入)。结果我的标题问题被编辑了。 所以实际上,在光线并行作业之前有这段代码:

shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])

不确定这是否使它更像是“并行”执行,而不仅仅是并发操作。

[更新 2] 共享数组是一个查找表,即就并行工作人员而言是只读的。

[更新 3] 我尝试了两种建议的解决方案:Threading 和 Ray / compute() 对于后者,建议将写函数用作遥控器,并在 for 循环中异步发送写操作,我最初认为这只能通过 .get() 来实现,这将是阻塞的。

因此,对于 Ray,这显示了两种解决方案:

@ray.remote
def write_to_parquet(df_list, filename):
    df = pd.concat(df_list)
    df.to_parquet(filename, engine='pyarrow', compression=None)

# Share array created outside the loop, read-only (big lookup table). 
# About 600 MB
shared_array_id = ray.put(shared_array)

for data_list in many_data_lists:

   new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
   write_to_parquet.remote(df_list, my_filename)

   ## Using threading, one would remove the ray decorator:
   # write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
   # write_thread.start()

对于 RAY 解决方案,这需要增加 object_store_memory,默认值是不够的:节点内存的 10% ~ 37 GB(我有 376 GB 的内存),然后上限为 20GB,唯一存储的对象总数约为22 GB:一个数据帧列表df_list(大约11 GB),以及它们在写入函数中连接的结果(大约11 GB),假设连接期间有一个副本。如果不是,那么这个内存问题没有意义,我想知道我是否可以传递 numpy 视图,我认为这是默认情况下发生的。这是 RAY 相当令人沮丧的方面,因为我无法真正预测每个 df_list 将有多少内存,它可以从 1 倍到 3 倍不等......

最后,坚持multiprocessing 使用线程是最有效的解决方案,因为处理部分(没有 I/O)更快:

from multiprocessing import Pool

# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)

def process_group(my_data):
   # Process a new dataframe here using my_data and some other data inside shared_array
   ...
   return my_df


n_workers = 60
with Pool(processes=n_workers) as pool:
   for data_list in many_data_lists:
      # data_list contains thousands of elements. I choose a chunksize of 10
      df_list = pool.map(process_group, data_list, 10)
      write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
            write_thread.start()

在每次循环迭代中,通常是len(many_data_lists) = 7000,每个列表包含 7 个大小为 (3, 9092) 的 numpy 数组。因此,这 7000 个列表将发送给 60 个工作人员:

每次循环迭代所有并行process_group 的时间:

雷:250 [s]

多处理:233 [s]

I/O:将 5GB parquet 文件写入外部 USB 3 旋转磁盘大约需要 35 秒。内部旋转盘上大约 10 秒。

Ray:使用 write_to_parquet.remote() 创建未来的开销约为 5 秒,它阻塞了循环。这仍然是在旋转磁盘上写入所需时间的 50%。这并不理想。

多处理:测量到的开销为 0 秒。

总上墙时间:

486 [s]

多处理436 [s]

我重复了几次,RayMultiprocessing 之间的差异始终显示 Multiprocessing 快了约 50 秒。这是一个显着的差异,也令人费解,因为 Ray 宣传更高的效率。

我将运行此程序进行更长时间的迭代并报告稳定性(内存、潜在的垃圾收集问题……)

【问题讨论】:

  • 写入文件是一个 I/O 密集型操作,并且应该使用很少的 CPU 时间,所以在后台线程中写入文件似乎对你有用。它还释放 GIL,因此它也不会干扰您在 Python 级别的其他处理。
  • 那么您是否建议将我的光线工作流程与线程的使用混合使用?
  • A ) pickle 问题:最好试试 Mike McKearn 的 substitute dill,最好简单地替换为 @987654357 @(它可以pickle.dump许多标准模块无法实现的结构(甚至可以有状态地{保存|加载}整个python会话这非常适合处理复杂模型并且具有可复制性渐进式快照和许多其他救生技巧等)B)性能:您介意用lstopo-no-graphics -.ascii 报告的实际计算设备的副本更新您的帖子吗?
  • @user3666197 这个命令的结果在我的终端中是巨大的。不知道你想如何用什么信息更新我的帖子。它甚至不适合屏幕截图。 (列表很长,毕竟有 72 个核心要描述......)
  • @user3666197 啊,你的意思是这个调用:very_big_df.to_parquet(outputfile) 在实际写入之前正在做一堆预处理?然后我同意这意味着 GIL 将在后台线程中保留大量时间。我不清楚这如何干扰process_group 的工作,因为它在不共享 GIL 的单独进程中运行。似乎通过网络将数据发送到预处理/写入另一个系统是避免 to_parquet 调用与 CPU 周期的计算工作竞争的唯一方法。

标签: python multiprocessing file-writing ray concurrent-processing


【解决方案1】:

您是否考虑过为将数据写入文件的 ray 任务分配 1 个核心?

[更新]原型

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)

@ray.remote
def write_to_parquet(data, filename):
    # write it until succeed.
    # record failed write somewhere. 
    # I assume failure to write is uncommon. You can probably just 
    # write ray.put() and have one background process that keeps failed 
    # write again.

# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :

   write_to_parquet.remote(very_big_df, filename)

【讨论】:

  • 当然,但是文档说每次调用 get() 时它都是一个阻塞操作,所以我很困惑,这似乎违背了目的。
  • 不确定我是否理解正确,但您不需要调用 ray.get() 来使远程任务正常工作。调用 .remote() 后,它会被异步调度并在集群中运行。 ray.get() 用于从远程任务的对象存储(通常是返回对象)中获取对象。如果你的意思是你想确保使用ray.get() 成功写入,你可以有一个后台线程定期检查你的任务结果。如果失败则重新安排。为此,您可以有一个 while 循环并使用 ray.wait()
  • 我认为 .remote() 只是在创建一个未来,返回一些我们打算稍后使用 .get() 计算的任务 ID。让我在不调用 get 的情况下尝试您所说的内容,看看文件是否被写入,即使我没有在此任务 ID 上调用 .get()。
  • 可以通过修改object_store_memoryray.init(object_store_memory=[bigger memory than the max size of your data frame])ray.readthedocs.io/en/latest/package-ref.html来增加对象内存大小,Ray支持对未使用(尚未完全使用)的对象进行垃圾回收。请注意,如果您的代码不在 Ray 垃圾回收的范围内,您仍然可能会遇到 OOM 问题。在这里查看更多详情。 stackoverflow.com/questions/60175137/…
  • 听起来不错。如果您想要更好的支持或信息,我建议您加入 Ray public slack。您可以在 ray repo 的 README.md 底部找到邀请链接。 github.com/ray-project/ray/blob/master/README.rst
猜你喜欢
  • 1970-01-01
  • 2010-11-20
  • 2021-03-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-03-21
  • 2015-01-31
  • 1970-01-01
相关资源
最近更新 更多