【问题标题】:Parallel computation in Julia with large dataJulia 中的大数据并行计算
【发布时间】:2015-10-18 10:00:46
【问题描述】:

首先我的问题:

  • 是否可以防止 Julia 每次在并行 for 循环中复制变量?
  • 如果没有,如何在 Julia 中实现并行 reduce 操作?

现在详细介绍:

我有这个程序:

data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
  fct(data)::DataFrame
end

它在功能方面工作得很好,但是在另一个工作人员上对 fct(data) 的每次并行调用都会复制整个数据帧,让一切都变得非常缓慢。

理想情况下,我想加载一次数据,并始终在每个工作人员上使用每个预加载的数据。 我想出了这个代码来做到这一点:

@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
  if (myid()-1) % nworkers()
    fct = filter_functions[i]
    filtered_data_temp = fct(data)
  end
  # How to vcat all the filtered_data_temp ?
end

但现在我遇到了另一个问题:我无法弄清楚如何使用 myid()==1 将所有过滤数据临时变量 vcat() 到工作程序中的变量上。

我将非常感谢任何见解。

注意:我知道Operating in parallel on a large constant datastructure in Julia。然而,我不相信它适用于我的问题,因为我所有的 filter_functions 都在整个数组上运行。

【问题讨论】:

    标签: parallel-processing julia


    【解决方案1】:

    您可能希望查看/加载您的数据到Distributed Arrays

    编辑:大概是这样的:

    data = DataFrames.readtable("...")
    dfiltered_data = distribute(data) #distributes data among processes automagically
    filter_functions = [ fct1, fct2, fct3 ... ] 
    for fct in filter_functions
      dfiltered_data = fct(dfiltered_data)::DataFrame
    end
    

    您也可以查看unit tests 了解更多示例

    【讨论】:

    • 看起来不错。让我检查一下。
    • 您可能还想考虑SharedArrays,如果您的所有数据都从一个进程开始,并且您不想为将它们转移到另一个进程而付出代价。
    • 很好,但是(1)我认为它使用共享内存,这可能不适用于分布式集群(2)“分布式”似乎不支持数据帧(仅限数组)。因为,您的示例以及单元测试的链接非常有启发性。
    【解决方案2】:

    毕竟,我在那里找到了我的问题的解决方案:Julia: How to copy data to another processor in Julia

    特别是,它引入了以下原语,以便从另一个进程中检索变量:

    getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
    

    以下是我的使用方法:

    @everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
    @everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
    # Executes the filter functions
    @everywhere for i in 1:length(filter_functions)
      local_results = ... # some type
      if (myid()-1) % nworkers()
        fct = filter_functions[i]
        filtered_data_temp = fct(data)
        local_results = vcat(local_results, filtered_data_temp)
      end
      # How to vcat all the filtered_data_temp ?
    end
    # Concatenate all the local results
    all_results = ... # some type
    for wid in 1:workers()
      worker_local_results = getfrom(wid, :local_results)
      all_results = vcat(all_results,worker_local_results)
    end
    

    【讨论】:

    • 虽然更进一步,看来我也可以使用 pmap()。
    猜你喜欢
    • 2015-10-17
    • 2015-09-09
    • 1970-01-01
    • 1970-01-01
    • 2017-08-22
    • 1970-01-01
    • 1970-01-01
    • 2011-02-25
    • 2020-05-29
    相关资源
    最近更新 更多