【发布时间】:2018-02-11 23:20:40
【问题描述】:
我正在为订阅队列并将所有到达的消息放入 MVar 的某些客户端实现 Conduit Source。
问题是我无法通过管道源从该 MVar 读取到 yield 这些消息,因为它在运行时报告异常:thread blocked indefinitely in an MVar operation
mqttSource :: (Monad m, MonadIO m, MonadResource m) => MqttOptions -> Source m String
mqttSource MqttOptions {..} = do
bracketP mkConsumer cleanConsumer runHandler
where
mkConsumer = do
chan <- liftIO $ newEmptyMVar
client <- liftIO.hookToChan $ chan
return (chan, client)
cleanConsumer (_, client) =
liftIO.disconnectClient $ client
runHandler (chan, client) = do
newMsg <- liftIO $ readMVar chan
yield newMsg
runHandler (chan, client)
(hookToChan 只是告诉客户端使用这个函数订阅队列:\topic msg -> putMVar chan (show msg))
【问题讨论】:
-
客户端运行在哪个线程?
-
为什么要使用
MVar而不是Chan? -
感谢@Cirdec,我在同一个线程中生成客户端 :) 另外,从 MVar 切换到 Chan,对此一无所知。在这种情况下更有用。
-
已经有一个用于从并发通道创建管道的库:stm-conduit
-
是的,但我目前正在从Eta 这样做,它需要移植 stm-conduit @MadNat 。谢谢:D
标签: haskell concurrency conduit