【问题标题】:Haskell, Channels, STM, -threaded, Message PassingHaskell、Channels、STM、线程、消息传递
【发布时间】:2015-08-01 03:20:19
【问题描述】:

我正在尝试使用通道/STM 在 Haskell 中实现消息传递。也许这是一个糟糕的想法,并且有更好的方法来实现/使用 Haskell 中的消息传递。如果是这种情况,请告诉我;但是,我的任务提出了一些关于并发 Haskell 的基本问题。

我听说过关于 STM 的好消息,尤其是在 Haskell 中的实现。由于它支持读写,并具有一些安全优势,我认为可以从那里开始。这提出了我最大的问题:是否

msg <- atomically $ readTChan chan

其中 chan 是一个 TChan Int,导致等待通道有一个值吗?

考虑以下程序:

p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main = do
    chan <- atomically $ newTChan
    p chan
    q chan

用 ghc --make -threaded 编译它,然后运行程序,你确实得到了 1 和 2 打印到控制台。现在,假设我们这样做了

main = do 
    chan <- atomically $ newTChan
    forkIO $ p chan 
    forkIO $ q chan

相反。现在,如果我们使用 - 线程,它将不打印任何内容,1 或 1 后跟 2 到终端;但是,如果您不使用 -thread 进行编译,它总是会打印 1 后跟 2。 问题 2: -threaded 和 not 之间有什么区别?我想它们并没有真正作为并发的东西运行,它们只是一个接一个地运行。这与下文一致。

现在,我想如果我让 p 和 q 同时运行;即我对它们进行了 forkIO,它们应该能够以相反的顺序运行。假设

main = do
    chan <- atomically newTChan
    forkIO $ q chan
    forkIO $ p chan

现在,如果我在没有 -thread 的情况下编译它,我永远不会将任何内容打印到控制台。如果我用 -thread 编译,我有时会这样做。虽然,得到 1 后 2 的情况非常罕见——通常只有 1 或没有。我也用 Control.Concurrent.Chan 进行了尝试,得到了一致的结果。

第二个大问题:channels 和 fork 是如何一起玩的,上面的程序是怎么回事?

无论如何,我似乎无法如此天真地模拟 STM 的消息传递。也许 Cloud Haskell 是解决这些问题的一种选择——我真的不知道。任何有关如何让消息传递缺少序列化的信息 ~~> 写入套接字 ~~> 从套接字读取 ~~> 反序列化将不胜感激。

【问题讨论】:

标签: haskell concurrent-programming stm


【解决方案1】:

不,你的想法是对的——这就是TChans 的用途——你只是漏掉了forkIO 的一个小点:

问题是你的主线程不会等待用forkIOsee here for reference)创建的线程终止

所以如果我使用参考中给出的 hint

import Control.Concurrent
import Control.Concurrent.STM

p :: Num a => TChan a -> IO ()
p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main :: IO ()
main = do
    children <- newMVar []
    chan <- atomically $ newTChan
    _ <- forkChild children $ p chan
    _ <- forkChild children $ q chan
    waitForChildren children
    return ()

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

它按预期工作:

d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main             ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe 
1
2
d:/Temp $

当然,如果您也将呼叫切换到 pq,它将继续工作

【讨论】:

  • 是否有任何模块/库可以简化这个forkChild/waitForChildren 的事情?
猜你喜欢
  • 2012-08-02
  • 1970-01-01
  • 2021-07-08
  • 2021-03-27
  • 2012-01-07
  • 1970-01-01
  • 1970-01-01
  • 2013-12-17
  • 1970-01-01
相关资源
最近更新 更多