【问题标题】:Stop threads from interleaving output停止线程交错输出
【发布时间】:2014-01-18 05:37:29
【问题描述】:

以下程序创建两个并发运行的线程,每个线程随机休眠一段时间,然后将一行文本打印到标准输出。

import Control.Concurrent
import Control.Monad
import System.Random

randomDelay t = randomRIO (0, t) >>= threadDelay

printer str = forkIO . forever $ do
  randomDelay 1000000 -- μs
  putStrLn str

main = do
  printer "Hello"
  printer "World"
  return ()

输出通常看起来像

>> main
Hello
World
World
Hello
WoHrelld
o
World
Hello
*Interrupted
>>

如何确保一次只有一个线程可以写入标准输出?这似乎是 STM 应该擅长的那种事情,但是所有 STM 事务对于某些 a 必须具有 STM a 类型,并且打印到屏幕的操作具有 IO a 类型,并且没有似乎是将IO 嵌入STM 的一种方式。

【问题讨论】:

    标签: multithreading haskell io stm io-monad


    【解决方案1】:

    使用STM 无法锁定您所描述的方式。这是因为STM 基于乐观锁定,因此每个事务都必须在任何时候都可以重新启动。如果将IO 操作嵌入到STM 中,它可能会被执行多次。

    这个问题最简单的解决方案可能是使用MVar 作为锁:

    import Control.Concurrent
    import Control.Concurrent.MVar
    import Control.Monad
    import System.Random
    
    randomDelay t = randomRIO (0, t) >>= threadDelay
    
    printer lock str = forkIO . forever $ do
      randomDelay 1000000
      withMVar lock (\_ -> putStrLn str)
    
    main = do
      lock <- newMVar ()
      printer lock "Hello"
      printer lock "World"
      return ()
    

    在这个解决方案中,锁作为参数传递给printer

    有些人更喜欢将锁声明为top-level global variable,但目前这需要unsafePerformIO 并依赖于AFAIK 不属于Haskell 语言报告的GHC 属性(特别是,它依赖于以下事实:一个非多态类型的全局变量在程序执行过程中最多被计算一次)。

    【讨论】:

    • 谢谢,真的很有用!
    【解决方案2】:

    基于Petr Pudlák's answer 的一些研究表明concurrent-extra 包中有一个模块Control.Concurrent.Lock,它提供了基于MVar () 的锁的抽象。

    使用该库的解决方案是

    import           Control.Concurrent
    import qualified Control.Concurrent.Lock as Lock
    import           Control.Monad
    import           System.Random
    
    randomDelay t = randomRIO (0, t) >>= threadDelay
    
    printer lock str = forkIO . forever $ do
      randomDelay 1000
      Lock.with lock (putStrLn str)
    
    main = do
      lock <- Lock.new
      printer lock "Hello"
      printer lock "World"
      return ()
    

    【讨论】:

      【解决方案3】:

      STM 处理输出的方式是有一个输出队列,所有线程共享,并由单个线程处理。

      import Control.Concurrent
      import Control.Concurrent.STM
      import Control.Monad
      import System.Random
      
      randomDelay t = randomRIO (0, t) >>= threadDelay
      
      printer queue str = forkIO . forever $ do
        randomDelay 1000000 -- μs
        atomically $ writeTChan queue str
      
      prepareOutputQueue = do
          queue <- newTChanIO
          forkIO . forever $ atomically (readTChan queue) >>= putStrLn
          return queue
      
      main = do
        queue <- prepareOutputQueue
        printer queue "Hello"
        printer queue "World"
        return ()
      

      【讨论】:

        【解决方案4】:

        这是 Petr 提到的使用全局锁的示例。

        import Control.Concurrent
        import Control.Monad
        import System.Random
        import Control.Concurrent.MVar  (newMVar, takeMVar, putMVar, MVar)
        import System.IO.Unsafe (unsafePerformIO)
        
        
        {-# NOINLINE lock #-}
        lock :: MVar ()
        lock = unsafePerformIO $ newMVar ()
        
        
        
        printer x = forkIO . forever $ do
           randomDelay 100000
           () <- takeMVar lock
           let atomicPutStrLn str =  putStrLn str >> putMVar lock ()
           atomicPutStrLn x
        
        randomDelay t = randomRIO (0, t) >>= threadDelay
        
        
        
        main = do
          printer "Hello"
          printer "World"
          return ()
        

        【讨论】:

        • 这比 Petr 的解决方案有什么优势?
        • 如果您不想将锁作为函数参数传递。假设这个打印机函数在外部代码库中被频繁调用,少1个参数让它感觉更像原生打印函数。
        【解决方案5】:

        如果您愿意,您实际上可以使用 STM 来实现锁定,尽管 MVar 几乎肯定会执行得更好。

        newtype Lock = Lock (TVar Status)
        data Status = Locked | Unlocked
        
        newLocked :: IO Lock
        newLocked = Lock <$> newTVarIO Locked
        
        newUnlocked :: IO Lock
        newUnlocked = Lock <$> newTVarIO Unlocked
        
        -- | Acquire a lock.
        acquire :: Lock -> IO ()
        acquire (Lock tv) = atomically $
          readTVar tv >>= \case
            Locked -> retry
            Unlocked -> writeTVar tv Locked
        
        -- | Try to acquire a lock. If the operation succeeds,
        -- return `True`.
        tryAcquire :: Lock -> IO Bool
        tryAcquire (Lock tv) = atomically $
          readTVar tv >>= \case
            Locked -> pure False
            Unlocked -> True <$ writeTVar tv Locked
        
        -- | Release a lock. This version throws an exception
        -- if the lock is unlocked.
        release :: Lock -> IO ()
        release (Lock tv) = atomically $
          readTVar tv >>= \case
            Unlocked -> throwSTM DoubleRelease
            Locked -> writeTVar tv Unlocked
        
        data DoubleRelease = DoubleRelease deriving Show
        instance Exception DoubleRelease where
          displayException ~DoubleRelease = "Attempted to release an unlocked lock."
        
        -- | Release a lock. This version does nothing if
        -- the lock is unlocked.
        releaseIdempotent :: Lock -> IO ()
        releaseIdempotent (Lock tv) = atomically $ writeTVar tv Unlocked
        
        -- | Get the status of a lock.
        isLocked :: Lock -> IO Status
        isLocked (Lock tv) = readTVarIO tv
        

        acquire/release 对需要仔细的屏蔽和异常处理,就像原始的 MVar 操作一样。文档建议但实际上并未说明 STM 操作在 retry 时是可中断的;假设这是真的,用于withMVar 的相同方法将在这里工作。注意:我打开了一个GHC ticket 来记录retry 可中断性。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多