【问题标题】:mpi4py: dynammic data processingmpi4py:动态数据处理
【发布时间】:2014-08-05 03:21:59
【问题描述】:

我有一个包含股票代码的向量,例如tickers = ['AAPL','XOM','GOOG'],在我的“传统”python 程序中,我将循环这个tickers 向量,选择一个代码字符串,例如AAPL,导入一个包含@987654325 的csv 文件@stock 收益,将收益作为普通函数的输入,最后生成一个 csv 文件作为输出。我有超过 4000 个代码,并且应用于每个代码的功能需要时间来处理。我可以访问带有mpi4py 包的计算机集群,每个作业可以访问大约100 个处理器。我很好理解(并且能够实现)这个mpiexample在python中:

from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if rank == 0:
    data = [i for i in range(8)]
# dividing data into chunks
    chunks = [[] for _ in range(size)]
    for i, chunk in enumerate(data):
        chunks[i % size].append(chunk)
else:
    data = None
    chunks = None
data = comm.scatter(chunks, root=0)
print str(rank) + ': ' + str(data)

[cha@cluster] ~/utils> mpirun -np 3 ./mpi.py 
2: [2, 5]
0: [0, 3, 6]
1: [1, 4, 7]

所以在这个例子中,我们有一个大小为 8 的数据向量,并为每个处理器(总共 3 个)分配相同数量的数据元素。我如何使用上面类似的示例并为每个处理器分配一个股票代码并应用需要为每个代码运行的功能?我如何告诉 python,一旦处理器空闲,返回tickers 向量并处理尚未处理的ticker

【问题讨论】:

    标签: python dynamic mpi mpi4py


    【解决方案1】:

    还有另一种思考方式。您有 100 个处理器处理 4000 个数据块。您可以查看的一种方法是,每个处理器都获得一个数据块来进行操作。平均分配,每个处理器将获得 40 个代码来处理。处理器 1 将获得 0-39,处理器 2 将获得 40-79,依此类推。

    以这种方式思考,您无需担心处理器完成其任务时会发生什么。只要有一个循环:

    block_size = len(tickers) / size # this will be 40 in your example
    for i in range(block_size):
        ticker = tickers[rank * block_size + i]
        process(ticker)
    
    def process(ticker):
        # load data
        # process data
        # output data
    

    这有意义吗?

    [编辑]
    如果您想了解更多信息,这实际上只是 row-major order 索引的一种变体,这是一种访问存储在单维内存中的多维数据的常用方法。

    【讨论】:

    • 感谢您的见解。今天晚些时候我会试试你的建议!谢谢
    • 所以你用你建议的正确替换所有这些:data = [i for i in range(8)] # dividing data into chunks chunks = [[] for _ in range(size)] for i, chunk in enumerate(data): chunks[i % size].append(chunk) else: data = None chunks = None?而 process(.) 是我处理代码返回的函数吗?
    • 如果您想实际看到这一点,只需将process 调用替换为print('%d - %s' % (rank, my_ticker_string)),我认为这将消除您可能遇到的任何困惑。
    • 好的,我明白了...但是我应用于每个代码的功能:加载 .txt 文件,处理数据,并为每个代码以 CSV 格式输出所有内容...在代码中的位置我可以应用此数据处理吗?想象一下,我用函数def process_job(ticker) 创建了另一个文件,然后导入了这个函数……在您建议的代码中,我必须在哪里编写process_job 函数才能获得所需的输出?感谢杰夫的帮助
    • 是的,这正是我加入 process 的原因。我在我的代码中调用的 process 就是你调用的 process_job。只需在同一个脚本中定义该函数,您就可以像我在 for 循环中显示的那样调用它。我将编辑上面的示例来演示这一点。
    猜你喜欢
    • 2015-06-29
    • 2014-08-31
    • 2017-03-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多