【问题标题】:F# Array.Parallel.map does not provide parallel processingF# Array.Parallel.map 不提供并行处理
【发布时间】:2020-11-14 07:24:01
【问题描述】:

我必须在 F# 中模拟一个离散环境,由 Python 调用,以解决强化学习问题。我有一个原始类型(主要是浮点数)的函数,可以使数据交换更顺畅。现在我可以用不同的数据多次运行这个函数,所以并行运行它似乎是个好主意。

我有以下代码:

type AscentStrategy = |Strategy of seq<float> 

let simulateAscent  env ascentLimiter initState (sequenceOfDepths:seq<float>)  =   
    //let infinitSeqOfConstantValues = (fun _ -> constantDepth) |> Seq.initInfinite
    sequenceOfDepths 
    |> Seq.scan ( fun ( nextState, rew, isTerminal, _ )  depth -> getNextEnvResponseAndBoundForNextAction(env, nextState , depth , ascentLimiter)  ) (  initState, 0.0 , false, 0.0)  
    |> SeqExtension.takeWhileWithLast (fun (_ , _, isTerminalState, _) ->  not isTerminalState)
    |> Seq.toArray

and then 

    let simulateStrategy ({MaxPDCS = maxPDCS ; MaxSimTime = maximumSimulationTime ; PenaltyForExceedingRisk = penaltyForExceedingRisk ; 
                       RewardForDelivering = rewardForDelivering ; PenaltyForExceedingTime = penaltyForExceedingTime ; IntegrationTime = integrationTime 
                       ControlToIntegrationTimeRatio = controlToIntegrationTimeRatio; DescentRate = descentRate; MaximumDepth = maximumDepth ; 
                       BottomTime = bottomTime ; LegDiscreteTime = legDiscreteTime } : SimulationParameters) (Strategy ascentStrategy : AscentStrategy) = 
    
    let env, initState ,  ascentLimiter , _  =  getEnvInitStateAndAscentLimiter  ( maxPDCS    , maximumSimulationTime , 
                                                                           penaltyForExceedingRisk ,  rewardForDelivering , penaltyForExceedingTime , 
                                                                           integrationTime  ,
                                                                           controlToIntegrationTimeRatio,  
                                                                           descentRate , 
                                                                           maximumDepth  , 
                                                                           bottomTime  , 
                                                                           legDiscreteTime   ) 
    ascentStrategy
    |> simulateAscent  env ascentLimiter initState

最后我调用了这个函数进行测试:

 let commonSimulationParameters = {MaxPDCS = 0.32 ; MaxSimTime = 2000.0 ; PenaltyForExceedingRisk  = 1.0 ; RewardForDelivering = 10.0; PenaltyForExceedingTime = 0.5 ;
                                      IntegrationTime = 0.1; ControlToIntegrationTimeRatio = 10; DescentRate = 60.0; MaximumDepth = 20.0 ; BottomTime = 10.0;  LegDiscreteTime = 0.1}


    printfn"insert number of elements"
    let maxInputsString = Console.ReadLine()
    let maxInputs = maxInputsString |> Double.Parse
    let inputsStrategies =  [|0.0 .. maxInputs|] |> Array.map (fun x -> Seq.initInfinite (fun _ -> x ) )
    let testParallel = inputsStrategies 
                   |> Array.Parallel.map (fun x -> (simulateStrategy commonSimulationParameters ( Strategy x )) ) 

我已经将它与 Array.map 进行了比较,虽然它更快并且使用了我笔记本电脑上 70% 的 CPU,但似乎仍然没有使用全部处理能力。我已经在具有更多内核(约 50 个)的机器上运行它,它几乎不会增加 CPU 使用率(它达到了总使用量的 3/4%,有 50 个独立输入)。我认为一定是某个地方产生了死锁,但是我该如何检测并摆脱它呢?

另外,为什么会这样?在我看来,函数式编程的优点之一就是能够轻松地进行并行化。

PS:SeqExtension.takeWhileWithLast 是我在 SO 上找到的一个函数,由 Tomas Petricek 在他的精彩答案之一中提供,如果需要我可以发布它。

PPS:env是环境,其类型定义为:

type Environment<'S, 'A ,'I>         =   |Environment of (State<'S> -> Action<'A> -> EnvironmentOutput<'S ,'I>)

我对 Async.Parallel 和 ParallelSeq 进行了同样的尝试,报告了同样的问题。

基于消息的解决方案能否解决问题>?我正在研究它,虽然我一点也不熟悉,但它是使用 MailboxProcessor 使代码并行的好方法吗?


根据我的问题, 我也尝试过这个很棒的基于数据流的并行代码库。 https://nessos.github.io/Streams/.

我添加了以下代码:

let nessosResult = inputsStrategies
                    |> ParStream.ofArray
                    |> ParStream.map simulateStrategy
                    |> ParStream.toArray

我已经为 inputStrategy 定义了一个临时类型(我拥有的基本的旧元组),以便模拟策略只接受一个输入。不幸的是,这个问题似乎很好地隐藏在某个地方。我附上了一张关于 CPU 使用率的图表。在不同情况下在我的机器上花费的时间是:~8.8 秒(连续); ~6.2 秒(Array.Parallel.map); ~ 6.1 秒 (Nessos.Streams)

【问题讨论】:

  • inputsStrategies 是从哪里来的?它们是如何生成的?
  • @FyodorSoikin 它们是用于测试的玩具序列。我生成 n 个具有恒定值的序列。我添加了用于测试代码是否并行运行的玩具代码。谢谢
  • Parallel 确实使用了多个内核,但隐藏的锁(例如您似乎正在使用的 Seq 中)可能会限制它实现的可能的并行性。如果您无法创建您在此处共享的最小可重现程序,那么我建议您尝试从例如Parallel.For 开始简单并观察它实现的并行性并从那里扩展,直到您了解是什么杀死了并行性。边注;我曾经对并行实现性能感到非常兴奋,但得出的结论是这很难,而且您可能无法负担得起好的抽象。
  • 谢谢。恐怕你是对的。我的代码有点抽象(可能太多了,但我想要一些可以在很多方面重用的东西);我不知道这会产生并行性问题。不过,Brianberns 的回答很有帮助。谢谢

标签: parallel-processing functional-programming f# deadlock mailboxprocessor


【解决方案1】:

我发现server garbage collection 是在 .NET 上获得最佳并行性能所必需的。在你的 app.config 中有这样的东西:

<configuration>
  <runtime>
    <gcServer enabled="true" />
  </runtime>
</configuration>

【讨论】:

  • 它并没有解决整个问题,但确实改善了时间。在我的笔记本电脑上,它似乎很好地利用了所有 4 个 CPU,在具有 80 个内核的 Xeon 上,它仍然缺乏完全的并行性,但在时间上它从 ​​2X(wrt 到顺序解决方案)到 10X,这已经是一个很好的改进.谢谢
猜你喜欢
  • 1970-01-01
  • 2010-10-15
  • 2020-08-19
  • 1970-01-01
  • 2019-05-22
  • 2013-03-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多