【问题标题】:How to atomically modify a value within a mutable array?如何原子地修改可变数组中的值?
【发布时间】:2021-10-11 07:39:11
【问题描述】:

我在 ST monad 中有这个可变数组。我有这个循环功能。

 runST $ do
    myarray <- newMArray (Sz 10) 0
    loopM_ 0 (<10) (+1) (\j ->
      loopM_ 0 (<10) (+1) (\i ->
        when (mytruthcheck j i)
             (modifyM_ myarray (pure . (+1)) ((funcofji j i) :: Int)
     )))

我想使用forkST_ 像这样并行运行外循环。

runST $ do
   myarray <- newMArray (Sz 10) 0
   loopM_ 0 (<10) (+1) (\j ->
     void (forkST_ (loopM_ 0 (<10) (+1) (\i ->
       when (mytruthcheck j i)
            (Data.Massiv.Array.Mutable.modifyM_
                 myarray (pure . (+1)) ((funcofji j i) :: Int)
    ))))

但我猜这会导致线程冲突,但我真的不知道,虽然我知道funcofji 可以为j 的不同值输出相同的值,因此循环可以修改myarray 的相同索引用于不同的 j s。有没有办法确保这是以原子方式完成的,还是已经如此?

顺便说一句,这是loopM_ 函数

loopM_ :: Monad m => Int -> (Int -> Bool) -> (Int -> Int) -> (Int -> m a) -> m ()
loopM_ !init' condition increment f = go init'
  where
    go !step
      | condition step = f step >> go (increment step)
      | otherwise = pure ()

【问题讨论】:

  • 很确定如果您要强制执行原子性,那么缓存惩罚将破坏任何多线程优势。只有在每个数组元素上计算一个非常昂贵的函数时,它才能得到回报。
  • 如果数组大小不是太大,那么你可以用不同的数组计算并行部分,然后将它们合并在一起(有一个幺半群)。
  • 如果你想要并行化(不是并发),那么forkST/forkIO 产生的 Haskell 线程的数量应该与你机器上的 CPUs/Cores 的数量有关。否则将是开销。
  • @Anon 没有。forkST 没有问题,你打电话给forkST 的数量有问题。 forkOn 帮不了你。你应该在一些计数上分割你的迭代范围(可以等于你机器上 CPU 的计数),然后在子范围上分叉循环。
  • 实现的另一个问题是您没有等待所有并行工作完成。

标签: haskell concurrency parallel-processing massiv


【解决方案1】:

正如 cmets 中提到的,原子修改仅对并发有用 这看起来不像这里需要的。您需要的是并行性。,在 massiv 中可用于 Ints:atomicAddIntArray

massiv 中还有一个内置的方法可以非常有效地进行并行处理,因此绝对不需要重新发明轮子:

  createArray_ Par (Sz 10) $ \scheduler myarray ->
    loopM_ 0 (<10) (+1) $ \j ->
      loopM_ 0 (<10) (+1) $ \i ->
        when (mytruthcheck j i) $ 
          scheduleWork_ scheduler $ 
            void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1

也不要自欺欺人,ST(状态线程)不是为多线程构建的,而是使用 IO。但是,如果您可以保证,尽管进行了多线程设置,但最终产生的结果仍然是确定性的,那么使用unsafePerformIO 是可以的。

编辑

我刚刚注意到这条评论:

j的循环大小接近100,000,i的循环大小接近10亿。

这让我相信以这种方式并行化它会更好:

  createArray_ Par (Sz 10) $ \scheduler myarray ->
    iforSchedulerM_ scheduler (0 ..: (100000 :. 1000000000)) $ \_ (j :. i) ->
      when (mytruthcheck j i) $ 
        void $ atomicAddIntArray myarray ((funcofji j i) :: Int) 1

这将确保您只安排几个工作,而不是数十亿。如果您对每个ij 工作负载有更深入的了解,请查看iforSchedulerM_ 实现以自定义并行化。

【讨论】:

  • 我认为这将无法正常工作,因为modifyM_ 不是原子的。这是@Anon 试图解决的问题。如果 funcofji 返回不同的索引,该解决方案可能是正确的。
  • 我明白了,那么使用原子增量就可以了。我进行了编辑,但在争论性能之前,我建议对其进行基准测试,我更确定它会很好。
  • 我同意你的看法。最好先编写一个基准,然后尝试将性能从简单的解决方案提高到复杂的解决方案。
  • @lehins CPU 核心利用率通常接近 100%,所有核心都在 100% 和 70% 之间波动。所以这是一个好兆头。唯一的缺点是它使用了 3 倍的内存。现在这不是一个真正的问题,但它可能取决于几件事。我将尝试更改 RTS 内存设置,看看是否可以在使用更少内存的同时保持 cpu 性能。
  • @Anon 我添加了另一个解决方案,我认为它应该更适合您的工作量。
【解决方案2】:

作为 cmets 中讨论的结果,我写了一个原型,说明这可能是怎样的,也许它会有用(我没有尝试编译,所以可能存在一些类型/语法错误)。

runST $ do
    let arrsz = 10 :: Int -- depends on codomain of funcofji
    let ncaps = 8 :: Int64 -- see also getNumCapabilities
    let outerLoopSize = 10^5  :: Int64
    let innerLoopSize = 10^12 :: Int64
    let chunksz = ceiling $ fromIntegral outerLoopSize / fromIntegral ncaps
    
    sync <- newEmptyMVar
    forM_ [0 .. ncaps - 1] $ \k -> forkST_ $ do
        localArr <- newMArray (Sz arrsz) 0
        forM_ [k * chunksz .. min outerLoopSize ((k + 1) * chunksz) - 1] $ \j -> do
            forM_ [0 .. innerLoopSize - 1] $ \i -> do
                when (mytruthcheck j i) $
                    modifyM_ localArr (pure . (+1)) $ funcofji j i
        putMVar sync localArr

    resultArr <- takeMVar sync
    replicateM_ (ncaps - 1) $ do
        localArr <- takeMVar sync
        forM_ [0 .. arrsz - 1] $ do \ix ->
            elm <- readM localArr ix
            modifyM_ resultArr (pure . (+elm)) ix

    ...

【讨论】:

  • @freestyles 我注意到formM_ [0 .. ncaps -1] 应该是[0 .. ncaps],这样outerLoopSize 的其余部分将在最后一个chunksz 结束后运行
  • @Anon 是的,你是对的,我的错。我通过将它们添加到块中来解决剩余问题。这是可能的,因为据我了解迭代的顺序无关紧要。
  • 另一种方法可能是除以(ncaps -1),然后从[0..naps-1] 开始,所以列表中仍然有 8 个线程,但有 7 个大小相等的块和一个 1 个剩余块
  • @Anon 是,但ncaps 可以是1nremainders 可以是0。所以可能需要类似:let chunksz = let (sz, nrems) = divMod outerLoopSize ncaps in if 0 == nrems then sz else div outerLoopSize (ncaps - 1).
  • @Anon 或像这样:let chunksz = ceiling $ fromIntegral outerLoopSize / fromIntegral ncaps
【解决方案3】:

基于@freestyle 和@lehins 的回答,我写了这个。这更接近@freestyle 的答案,但使用@lehins 指出的unsafeAtomicAddIntArray 并保持使用loopM_。出于某种原因,尽管充分利用了所有内核,但@lehins 的回答并未在合理的时间内完成计算。这似乎是使用 massiv scheduler 的结果,但我不确定。值得一提的是,没有使用交换,并且内存始终可用。与我的问题中未使用 forkIO 或 forkST_ 的程序相比,使用此解决方案的速度提高了 3 倍。这也比我的单线程程序多使用 50% 的内存。我稍后可能会在这个答案中添加一个程序,该程序通过将线程从最外层循环分配到最内层循环,直到ncaps 的所有线程都被分配,同时考虑到每个循环的大小,来概括交换操作并行嵌套循环处理。

 unsafePerformIO $ do
        let arrz = 10 :: Int
        let ncaps = 8 :: Int -- see also getNumCapabilities
        let outerLoopSize = 10^5  :: Int
        let innerLoopSize = 10^12 :: Int
        let chunksz = outerLoopSize `div` (ncaps-1)
        myarray <- newMArray (Sz arrz) 0
        Control.Monad.Parallel.forM_ [0 .. (ncaps-1)] (\k -> (loopM_ (k * chunksz) (< (min outerLoopSize ((k+1) * chunksz) )) (+ 1) (\j -> (loopM_ 0 (< innerLoopSize) (+ 1) (\i -> when (mytruthcheck j i) (void (unsafeAtomicAddIntArray myarray (funcofji j i) 1)))))))
        unsafeFreeze Par myarray

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-10
    • 2019-05-08
    • 2019-03-11
    • 2012-04-13
    • 1970-01-01
    相关资源
    最近更新 更多