【问题标题】:Pipeline-like operation using TChan使用 TChan 的流水线式操作
【发布时间】:2015-06-08 15:33:16
【问题描述】:

我想在两个线程之间实现管道。我有线程 A 获取数据,对其进行处理,然后将其发送到线程 B。我有一个 MVar 来检查数据是否已完全处理

但是,我遇到了一个例外*** Exception: thread blocked indefinitely in an STM transaction

为什么我的线程被阻塞了?我虽然比第一个线程在通道上写入时,然后当通道上有数据时,第二个线程可以读取它

fstPipe :: (a -> b) -> TChan b -> MVar () -> [a] -> IO ()
fstPipe f chIn m xs = do
    ( mapM_(\x-> atomically $ writeTChan chIn $ f x) xs) >> putMVar m ()

pipelineDone channel mIn = do
    isDone <- fmap isJust $ tryTakeMVar mIn
    isEmpty <- atomically $ isEmptyTChan channel
    return $ isDone && isEmpty

lastPipe f chIn mIn = iter 
    where iter = do
        atomically $ fmap f $ readTChan chIn
        isDone <- pipelineDone chIn mIn
        unless isDone $ iter

pipeline = do
    chIn <- atomically newTChan
    m <- newEmptyMVar
    first <- async $ fstPipe reverse chIn m $ replicate 10 [1..500]
    last <- async $ lastPipe print chIn m
    wait first
    wait last

【问题讨论】:

    标签: multithreading haskell stm


    【解决方案1】:

    在同一个代码块中使用 STM 信号量对我来说似乎很奇怪......为什么不在 STM 中做整个事情呢?

    特别是,为什么不是TChan (Maybe x)Nothing 表示序列的结束?

    另外,请注意您的 fstPipe 可能只是生成了一堆未评估的 thunk,并立即将它们放入 TChan,而无需实际计算任何东西。你可能想要一个seq 或类似的东西来强制在那个线程上发生一些实际的工作

    【讨论】:

    • 谢谢!因此,如果我理解得很好,如果我写“writeTchan chin $ seq $ f x”,它会评估 f x 然后将其添加到频道中?
    • 如果你做writeTChan ch $! f x(注意$!而不是$),这将强制f的结果弱头正常形式。如果您想进一步评估......好吧,这是另一个问题。搜索 SO 以获取以前的答案。 ;-)
    【解决方案2】:

    我认为有一个竞争条件:

    • putMVar 之前停止fstPipe
    • 提前lastPipe阅读所有内容,然后致电pipelineDone
    • pipelineDone 返回 False,因为 putMVar 尚未完成
    • lastPipe 将尝试从频道读取数据
    • putMVar 执行,但为时已晚

    现在lastPipe 卡在一个空频道上阅读。

    【讨论】:

      【解决方案3】:

      您的问题在于pipelineDone 的逻辑。目前,您有:

      pipelineDone channel mIn = do
        isDone <- fmap isJust $ tryTakeMVar mIn
        isEmpty <- atomically $ isEmptyTChan channel
        return $ isDone && isEmpty
      

      tryTakeMVar 将获取 MVar 的内容,假设其中有东西。假设您的生产者首先完成,它会将() 写入 MVar。然后,您的消费者将尝试获取其中的内容。如果成功,则 MVar 为空。任何后续的tryTakeMVar 将始终返回Nothing,因此isDone &amp;&amp; isEmpty 将始终返回false,您将继续尝试从TChan 中读取。一旦TChan 为空,GHC 就会告诉您它遇到了死锁。

      您应该改为将您的 pipelineDone 实现更改为:

      pipelineDone channel mIn = do
        stillRunning <- isEmptyMVar mIn
        isEmpty <- atomically $ isEmptyTChan channel
        return $ (not stillRunning) && isEmpty
      

      这将改为简单地轮询 MVar,而不是实际清空它。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2022-11-26
        • 1970-01-01
        • 1970-01-01
        • 2014-03-22
        • 1970-01-01
        相关资源
        最近更新 更多