【问题标题】:Concurrent stack implementation using MVar使用 MVar 的并发堆栈实现
【发布时间】:2014-05-05 11:42:18
【问题描述】:

我正在尝试实现一个堆栈以在并发应用程序中使用。我想要以下语义:push 永远不应该阻塞,pop 应该阻塞空堆栈上的调用线程,但仍然允许 pushes。我实现如下(底部无关位):

data Stream a = Stream a (MVar (Stream a))
data Stack a = Stack (MVar (Int, MVar (Stream a)))

popStack :: Stack a -> IO a 
popStack (Stack stack) = do 
  (sz, mvar) <- takeMVar stack
  mbStream <- tryTakeMVar mvar 
  case mbStream of 
    Nothing -> putMVar stack (sz, mvar) >> popStack (Stack stack)
    Just (Stream x xs) -> putMVar stack (sz-1, xs) >> return x

如果流MVar 为空,我必须释放堆栈上的锁并重试。然而,这看起来像是一个杂物:如果一个线程在一个空堆栈上调用pop,它可能会在被挂起之前循环几次,即使在该线程正在执行时MVar 不会变满。有没有更好的方法利用MVars 来编写具有所需语义的pop


import Control.Concurrent.MVar 
import Control.Monad 
import Control.Concurrent
import Text.Printf

newStack :: IO (Stack a) 
newStack = do 
  stream <- newEmptyMVar 
  Stack <$> newMVar (0, stream)

pushStack :: Stack a -> a -> IO ()
pushStack (Stack stack) val = do 
  (sz, stream) <- takeMVar stack
  stream' <- newMVar (Stream val stream)
  putMVar stack (sz+1, stream')

test = do 
  s <- newStack
  _ <- forkIO $ mapM_ (\a -> printf "pushing %c... " a >> pushStack s a >> threadDelay 100000) ['a' .. 'z']
  _ <- forkIO $ do 
         replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
         replicateM 13 (popStack s) >>= printf "\npopped 13 elems: %s\n"
  threadDelay (5*10^6)
  putStrLn "Done"

【问题讨论】:

  • 给定的语义不完整。您需要类似“已推送每个弹出结果”之类的内容。尽管如此,您仍无法将堆栈与队列(?)区分开来,因此您可以查看hackage.haskell.org/package/base-4.7.0.0/docs/… 的(源代码)。请注意,那里没有tryTakeMVar。而且-您绝对想自己进行锁定吗?如果没有,请考虑 STM。
  • @d8d0d65b3f7cf42 为什么我需要将其添加到语义中?如果一个值被弹出,如果它没有被推送,它怎么会到达那里? Chan 是 FIFO,Stack 是相反的 (LIFO) - 它们如何无法区分?如果我能用ChanStack,那就太好了。我见过Chan,我不认为我可以以同样的方式实现Stack - 对于堆栈来说,读写MVars 无论如何都会指向相同的值。我可以使用 STM——但这能解决这个特殊问题吗?
  • 您的规范仅涉及阻塞。你需要添加一些关于价值观的东西。否则,您的实现可能会凭空制造,或者忽略已推送的内容,或复制它。 FIFO 在并发设置中的含义并不明显。也许你想要一些静态属性(“如果只有一个线程在运行,那么它就是 FIFO”),或者线性化。在我们开始讨论(正确性)实现之前,所有这些都需要明确说明。
  • 好的,当你说语义不完整时,我明白你的意思了。 push 应该总是将堆栈上的元素数量增加 1,并且 pop 应该总是将该数量减少 1,除非堆栈为空,在这种情况下它会阻塞。我希望如果我知道 1 在 2 之前被推送,这意味着 2 将在 1 之前弹出。无论如何,我会在继续之前阅读您链接的论文。
  • 你不一定有一个明确定义的“之前”关系,或者你需要太多的锁定来强制它。

标签: haskell concurrency


【解决方案1】:

这不是很令人兴奋,但最简单的解决方案是使用 STM(如果您使用的是 cabal,则需要依赖项列表中的 stm 包)。

import Control.Concurrent.STM

newtype Stack a = Stack (TVar [a])

new :: STM (Stack a)
new = fmap Stack $ newTVar []

put :: a -> Stack a -> STM ()
put a (Stack v) = modifyTVar' v (a:)

get :: Stack a -> STM a
get (Stack v) = do
    stack <- readTVar v
    case stack of
         [] -> retry
         (a:as) -> do writeTVar v as
                      return a

您可以使用retry 获得您想要的阻塞行为,它的实现方式是在TVar 更改为[] 以外的其他内容之前不会唤醒线程。这也很好,因为您现在可以在更大的组合原子事务中使用您的堆栈,并且您不必担心确保异常不会破坏您的结构。

如果您尝试通过大量线程争用读取和/或写入来实现高性能并发,您可能会发现这样做不够聪明。在这种情况下,您可能会很高兴设计一个基于来自 atomic-primops 的基于 fetch-and-add 的计数器的结构,或者查看 hackage 上还有什么可用的。

【讨论】:

【解决方案2】:

快速评论:

  1. “推送不应该阻塞”不是您要实际实现的目标。尽管您可能对“块”有一个不同于 GHC 含义的个人定义。例如,您的 pushStack 确实会阻塞。
  2. 空堆栈上的popStack 进入一个非常繁忙的循环,反复获取和放置堆栈MVar。你不想这样做,甚至说“pop 应该阻止”。
  3. 您使用 takeMVar 和 putMVar 而不是 withMVar 或 modifyMVar。这意味着您没有考虑异常,并且堆栈在通用库中不会很好。

所以您已经了解了 MVar,并且您正在使用它们来了解更多信息。

这里的 StackData 是一个有数据的堆栈(Full)或没有数据的堆栈(Empty)。当 Empty 时,有一个初始为空的 MVar 供饥饿的 poppers 等待。

type Lock = MVar ()
type Some a = (a, [a]) -- non empty version of list
data StackData a = Full !(Some a)
                 | Empty !Lock
data Stack a = Stack { stack :: MVar (StackData a) }

pop s = do
    x <- modifyMVar (stack s) $ \ sd ->
           case sd of
               Empty lock -> do
                   return (Empty lock, Left lock)
               Full (a, []) -> do
                   lock <- newEmptyMVar
                   return (Empty lock, Right a)
               Full (a, (b:bs)) -> return (Full (b, bs), Right a)
    case x of
        Left lock -> do
            withMVar lock return  -- wait on next pusher
            pop s
        Right a -> return a


 push s a = modifyMVar_ (stack s) $ \ sd ->
           case sd of
               Empty lock -> do
                   tryPutMVar lock () -- should succeed, releases waiting poppers
                   evaluate Full (a,[]) -- do not accumulate lazy thunks
               Full (b, bs) -> do
                   xs <- evaluate (b:bs) -- do not accumulate lazy thunks
                   evaluate (Full (a, xs)) -- do not accumulate lazy thunks

注意:我没有尝试编译这个。

编辑:更安全的 push 版本只需要在成功将堆栈从 Empty 修改为 Full 时将 () 放入锁中。这种确定性可以通过“掩码”操作来实现。 'restore' 在 'modifyMVar' 中使用,但不是必需的:

push s a = mask $ \restore -> do
    mLock <- modifyMVar (stack s) $ \ sd -> restore $
           case sd of
               Empty lock -> do
                   n <- evaluate Full (a,[]) -- do not accumulate lazy thunks
                   return (n, Just lock)
               Full (b, bs) -> do
                   xs <- evaluate (b:bs) -- do not accumulate lazy thunks
                   n <- evaluate (Full (a, xs))
                   return (n, Nothing)
    whenJust mLock $ \ lock -> tryPutMVar lock ()

【讨论】:

  • 1.它会阻塞,但前提是有另一个推送,并且推送应该很快发生,所以在实践中它应该(我希望)表现得好像它没有阻塞一样。 2. 这就是问题的重点。 3.我打算稍后实现异常处理。我想先让基础知识发挥作用。
  • 您的实现似乎符合我的要求。但是为什么要在 push 上评估整个堆栈呢?我希望事情由 poppers 来评估,而不是 pushers。
  • 我们的 pushers 和 poppers 都占用 MVar 堆栈,因此会阻止其他用户。评估仅是 WHNF,仅强制新的 (:)-cons 单元格和完整构造函数。数据本身不是强制的。这可以防止推送者构建一长串非强制 (:) 和完整应用程序。使用严格 (!) 构造函数创建数据类型也会使其成为 WHNF。在没有正确强制的情况下将惰性的东西放入 MVar 是一个常见的故障,所以我避免做一个坏例子。如果阻塞弹出器死亡,或者在弹出时被杀死,则堆栈中的任何元素都不会丢失;新的编辑让推动者死亡更安全。
猜你喜欢
  • 2011-04-29
  • 2022-01-25
  • 1970-01-01
  • 1970-01-01
  • 2013-01-25
  • 1970-01-01
  • 2011-07-29
  • 2019-05-09
  • 2015-08-17
相关资源
最近更新 更多