【问题标题】:Why does my concurrent Haskell program terminate prematurely?为什么我的并发 Haskell 程序过早终止?
【发布时间】:2011-11-26 03:10:04
【问题描述】:

我有一个 UDP 服务器,它反映了它收到的每条 ping 消息(我认为这很好用)。我是客户端,然后我想做两件事:

  1. 确保我发送了 N(例如 10000)条消息,并且
  2. 计算正确收到的响应数。

似乎由于 UDP 的性质或由于 forkIO 的事情,我下面的客户端代码过早结束/根本不做任何计数。

我也很惊讶地看到函数tryOnePing 返回了 Int 4 的 250 倍。为什么会这样?

main = withSocketsDo $ do
        s <- socket AF_INET Datagram defaultProtocol
        hostAddr <- inet_addr host
        thread <- forkIO $ receiveMessages s
        -- is there any better way to eg to run that in parallel and make sure
        -- that sending/receiving are asynchronous? 


        -- forM_ [0 .. 10000] $ \i -> do
              -- sendTo s "ping" (SockAddrInet port hostAddr)
        -- actually this would be preferred since I can discard the Int 4 that
        -- it returns but forM or forM_ are out of scope here?

        let tryOnePing i = sendTo s "ping" (SockAddrInet port hostAddr)
        pings <- mapM tryOnePing [0 .. 1000]
        let c = length $ filter (\x -> x==4) pings

        -- killThread thread
        -- took that out to make sure the function receiveMessages does not
        -- end prematurely. still seems that it does

        sClose s
        print c
        -- return()

receiveMessages :: Socket -> IO ()
receiveMessages socket = forever $ do
        -- also tried here forM etc. instead of forever but no joy
        let recOnePing i = recv socket 1024
        msg <- mapM recOnePing [0 .. 1000]
        let r = length $ filter (\x -> x=="PING") msg
        print r
        print "END"

【问题讨论】:

标签: networking haskell concurrency


【解决方案1】:

这里的主要问题是,当您的主线程完成时,所有其他线程都会自动终止。您必须让主线程等待receiveMessages thread,否则它很可能会在收到任何响应之前完成。一种简单的方法是使用MVar

MVar 是一个同步单元格,可以为空或只保存一个值。如果当前线程试图从一个空的MVar 中取出或插入一个完整的线程,它会阻塞。 在这种情况下,我们不关心值本身,所以我们只需在其中存储一个()

我们将从空的MVar 开始。然后主线程将分叉出接收线程,发送所有数据包,并尝试从MVar中获取值。

import Control.Concurrent.MVar

main = withSocketsDo $ do
    -- prepare socket, same as before

    done <- newEmptyMVar

    -- we need to pass the MVar to the receiver thread so that
    -- it can use it to signal us when it's done
    forkIO $ receiveMessages sock done

    -- send pings, same as before

    takeMVar done    -- blocks until receiver thread is done

在接收线程中,我们将接收所有消息,然后在MVar 中放入一个() 以表示我们已完成接收。

receiveMessages socket done = do
    -- receive messages, same as before

    putMVar done ()  -- allows the main thread to be unblocked

这解决了主要问题,程序在我的 Ubuntu 笔记本电脑上运行良好,但还有一些事情需要处理。

  • sendTo 不保证会发送整个字符串。您必须检查返回值以查看发送了多少,如果不是全部发送,则重试。如果发送缓冲区已满,即使像 "ping" 这样的短消息也会发生这种情况。

  • recv 需要连接的套接字。你会想改用recvFrom。 (尽管由于某种未知原因它仍然可以在我的电脑上运行)。

  • 打印到标准输出不同步,因此您可能需要更改此设置,以便使用MVar 来传达接收数据包的数量,而不仅仅是()。这样,您就可以完成主线程的所有输出。或者,使用另一个MVar 作为互斥体来控制对标准输出的访问。

最后,我建议您仔细阅读Network.SocketControl.ConcurrentControl.Concurrent.MVar 的文档。我的大部分答案都是从那里找到的信息拼接在一起的。

【讨论】:

  • 谢谢,有什么方法可以让它们保持非同步/非阻塞状态,同时保持它们同时运行,但不将它们作为单独的应用程序启动?
  • @JFritsch:不确定我是否理解“单独的应用程序”部分。你说的是客户端和服务器吗?
  • 有什么办法可以吗?使用 par 组合器等让主线程运行(同时仍继续接收器线程),以防它在实践中成为更大程序的一部分。
  • @JFritsch: par 用于并行性,而不是并发性。如果你想让两个线程都运行,那没问题。只有当整个应用程序的主线程完成时,其他线程才会被杀死。我很难在这里更详细地回答,因为我不知道您的更大程序的细节,但是您可能不得不在某些时候使用一些同步来在线程之间进行通信。 MVar 只是众多可用同步选项中的一个示例。
  • 它真的可以在您的 PC 上运行吗?当涉及到 MVar 时,我得到了两个奇怪的错误:recv: invalid argument (Bad file descriptor)thread blocked indefinitely in an MVar operation,根据本书,后者应该在线程存在循环依赖时发生。我找不到任何东西。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多