【问题标题】:QSem doesn't seem to block threadsQSem 似乎没有阻塞线程
【发布时间】:2023-03-11 00:13:01
【问题描述】:

我正在编写一个简单的脚本来使用Shelly 库并行运行一堆任务,但我想限制任何时候运行的最大任务数。该脚本获取一个文件,每行都有一个输入,并为该输入运行一个任务。文件中有几百个输入,我想一次限制为大约 16 个进程。

当前脚本实际上限制为 1(很好地尝试)使用初始计数为 1 的 QSem。但我似乎遗漏了一些东西,因为当我在具有 4 个输入的测试文件上运行时,我看到了这个:

开始 开始 开始 开始 完毕 完毕 完毕 完毕

所以线程并没有像我期望的那样阻塞在 QSem 上,它们都是同时运行的。我什至在MVarTVar 上都实现了我自己的信号量,但都没有按我预期的方式工作。我显然错过了一些基本的东西,但是什么?我还尝试编译代码并将其作为二进制文件运行。

#!/usr/bin/env runhaskell {-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-} 进口雪莉 导入前奏隐藏(FilePath) 导入 Text.Shakespeare.Text (lt) 导入合格的 Data.Text.Lazy 作为 LT 导入 Control.Monad (forM) 导入 System.Environment (getArgs) 将合格的 Control.Concurrent.QSem 导入为 QSem 导入 Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar) -- 定义最大并发进程数 maxProcesses :: IO QSem.QSem maxProcesses = QSem.newQSem 1 bkGrnd :: ShIO a -&gt ShIO (MVar a) bkGrnd 过程 = 做 mvar &lt-liftIO newEmptyMVar _ &lt-liftIO $ forkIO $ 做 -- 阻塞直到有空闲进程 sem &lt- maxProcesses QSem.waitQSem sem putStrLn "开始" -- 运行shell命令 结果 &lt- shelly $ 静默处理 liftIO $ putMVar mvar 结果 putStrLn “完成” -- 表示这个过程已经完成并且另一个可以运行。 QSem.signalQSem sem 返回 mvar 主::IO() main = shelly $ 默默地 $ do [img, file] &lt- liftIO $ getArgs 内容 &lt- readfile $ fromText $ LT.pack 文件 -- 为每一行输入运行一个后台进程。 结果 &lt- forM (LT.lines 内容) $ \line -> bkGrnd $ do runStdin &ltcommand> &ltarguments> LiftIO $mapM_takeMVar 结果

【问题讨论】:

  • 我不了解雪莉,但从您的代码看来,bkGrnd 的每个应用程序都有自己的新信号量,初始化为 1。您应该先创建一个,然后将同一个信号量传递给每次通话。

标签: haskell semaphore shelly


【解决方案1】:

正如我在评论中所说,对bkGrnd 的每次调用都会创建自己的信号量,从而允许每个线程继续进行而无需等待。我会尝试这样的事情,其中​​信号量是在main 中创建的,并且每次都传递给bkGrnd

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    QSem.waitQSem sem
    --
    -- code continues as before
    --

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    sem <- maxProcesses
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results

【讨论】:

  • 哇,我是个白痴:p。我以前从未尝试在 Haskell 中使用全局可变数据(不是我通常喜欢的东西,但它是一个脚本),但是既然您已经指出了问题,那么问题就很明显了。谢谢!
  • @AndrewMyers:别担心,即使是最简单的并发错误有时也很难发现 :) 顺便说一句,sem 不是全局的,我宁愿说它是共享的。它在main 中声明,并作为对共享信号量的“引用”传递给线程。
  • 是的,我的意思是我打算做什么。我将maxProcesses 视为全局信号量,但它是一个全局IO QSem 操作,每次都会创建一个新信号量。你的方式要干净得多,如果我不尝试像在 zsh 中那样编写脚本,我通常会这样做。所以我认为如果不使用unsafePerformIO,实际上不可能拥有全局可变状态,我是否正确。如果是这样,那很酷,但我以前没有意识到。
  • 我明白了,当您说“全球”时,我以为您在谈论我的解决方案。然而,我对 Haskell 比较陌生,所以我对unsafePerformIO 仍然一无所知:对不起,我不能告诉你这是否是唯一的方法(也不是它确实是一种处理全局数据的方法:) )。
【解决方案2】:

你有答案,但我需要补充一点:如果 killThread 或异步线程死亡是可能的,则 QSem 和 QSemN 不是线程安全的。

我的错误报告和补丁是GHC trac ticket #3160。固定代码以名为 SafeSemaphore 的新库的形式提供,带有模块 Control.Concurrent.MSem、MSemN、MSampleVar 和奖励 FairRWLock。

【讨论】:

  • 当计划将其合并到 7.0.1 作为 QSem 的更新时,我在邮件列表中看到了部分讨论。我从 Trac 票上看到没有发生任何事情,所以我会检查安全包。感谢您的提示!
【解决方案3】:

不是更好

bkGrnd sem proc = do
  QSem.waitQSem sem
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
  ...

在你得到信号量之前,甚至forkIO

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-03
    • 2012-12-18
    • 2011-05-21
    • 1970-01-01
    • 1970-01-01
    • 2020-04-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多