【问题标题】:Conduit Source depending on MVar取决于 MVar 的导管源
【发布时间】:2018-02-11 23:20:40
【问题描述】:

我正在为订阅队列并将所有到达的消息放入 MVar 的某些客户端实现 Conduit Source。

问题是我无法通过管道源从该 MVar 读取到 yield 这些消息,因为它在运行时报告异常:thread blocked indefinitely in an MVar operation

mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
  bracketP mkConsumer cleanConsumer runHandler
 where
  mkConsumer = do
    chan <- liftIO $ newEmptyMVar
    client <- liftIO.hookToChan $ chan
    return (chan, client)

  cleanConsumer (_, client) =
    liftIO.disconnectClient $ client

  runHandler (chan, client) = do
    newMsg <- liftIO $ readMVar chan
    yield newMsg
    runHandler (chan, client)

(hookToChan 只是告诉客户端使用这个函数订阅队列:\topic msg -&gt; putMVar chan (show msg))

【问题讨论】:

  • 客户端运行在哪个线程?
  • 为什么要使用MVar 而不是Chan
  • 感谢@Cirdec,我在同一个线程中生成客户端 :) 另外,从 MVar 切换到 Chan,对此一无所知。在这种情况下更有用。
  • 已经有一个用于从并发通道创建管道的库:stm-conduit
  • 是的,但我目前正在从Eta 这样做,它需要移植 stm-conduit @MadNat 。谢谢:D

标签: haskell concurrency conduit


【解决方案1】:

感谢 Cirdec 制作的 cmets,我设法解决了这个问题。

问题是我在同一个线程中生成客户端。

hookToChan 负责这样做,我在同一个线程上订阅了队列。我刚刚在hookToChan 函数中添加了forkIO,问题就消失了。

【讨论】:

    猜你喜欢
    • 2014-02-04
    • 1970-01-01
    • 2020-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多