【问题标题】:Generalizing a function to merge a set of Haskell pipes Producers泛化一个函数以合并一组 Haskell 管道生产者
【发布时间】:2014-03-11 18:42:44
【问题描述】:

我正在使用Haskell pipes package

我正在尝试使用pipes-concurrency 将生产者列表合并在一起。

我想要达到的是:

merge :: MonadIO m => [Producer a m ()] -> Producer a m ()

所以给定一个生产者 s1 和另一个生产者 s2:r = merge [s1, s2] 这将给出行为:

s1 --1--1--1--|
s2 ---2---2---2|
r  --12-1-21--2|

按照我想出的教程页面中的代码:

mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    fork :: Output a -> Producer a IO () -> IO ()
    fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
                                              performGC

按预期工作。

但是我很难概括事物。

我的尝试:

merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
    (output, input) <- liftIO $ spawn Unbounded
    _ <- liftIO $ mapM (fork output) producers
    fromInput input
  where
    runEffectIO :: Monad m => Effect m r -> IO (m r)
    runEffectIO e = do
        x <- evaluate $ runEffect e
        return x
    fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
                                       performGC

不幸的是,这可以编译,但并没有做太多其他事情。我猜我把runEffectIO弄得一团糟。我目前的runEffectIO 的其他方法没有产生更好的结果。

程序:

main = do
    let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
    _ <- runEffect $ producer >-> taker 20
  where repeater :: Int -> Int -> Producer Int IO r
        repeater val delay = forever $ do
            lift $ threadDelay delay
            yield val
        taker :: Int -> Consumer Int IO ()
        taker 0 = return ()
        taker n = do
            val <- await
            liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
            taker $ n - 1

点击val &lt;- await,但没有到达liftIO $ putStrLn,因此它不会产生任何输出。但是它没有挂起就可以正常退出。

当我用mergeIO 替换merge 时,程序运行时,我希望输出20 行。

【问题讨论】:

  • 如果使用基于推送的管道会更好吗?即&gt;~&gt; 而不是&gt;-&gt;?

标签: haskell haskell-pipes


【解决方案1】:

虽然MonadIO 不足以完成此操作,但MonadBaseControl(来自monad-control)旨在允许在基本单子中嵌入任意转换器堆栈。配套包lifted-base 提供了fork 的版本,适用于变压器堆栈。我已经整理了一个使用它来解决您的问题的示例in the following Gist,尽管主要的魔法是:

import qualified Control.Concurrent.Lifted as L
fork :: (MonadBaseControl IO m, MonadIO m) => Output a -> Producer a m () -> m ThreadId
fork output producer = L.fork $ do
    runEffect $ producer >-> toOutput output
    liftIO performGC

请注意,您应该了解以这种方式处理一元状态会发生什么:对子线程中执行的任何可变状态的修改将仅与这些子线程隔离。换句话说,如果你使用StateT,每个子线程都会从它被派生时在上下文中的相同状态值开始,但是你会有许多不同的状态,它们不会相互更新。

在 monad-control 上有一个 appendix in the Yesod book,尽管坦率地说它有点过时了。我只是不知道最近的教程。

【讨论】:

    【解决方案2】:

    问题似乎是您使用了evaluate,我认为它是来自Control.Exceptionevaluate

    您似乎正在使用它来将通用 monad m 中的值“转换”为 IO,但实际上并不是这样。您只是从Effect 中获取m 值,然后将其返回到IO 中,而不实际执行它。以下代码不打印“foo”:

    evaluate (putStrLn "foo") >> return ""
    

    也许您的merge 函数可以将函数m a -&gt; IO a 作为附加参数,以便merge 知道如何将runEffect 的结果带入IO

    【讨论】:

    • 是否可以创建MonadIO m =&gt; m a -&gt; IO a 类型的函数?
    • 特别是,这里有一个newtype ReaderIO r a = ReaderIO { runReaderIO :: r -&gt; IO a }instance MonadIO ReaderIO where liftIO io = ReaderIO $ \_ -&gt; io 的例子。这个例子清楚地表明,除非我们首先提供r,否则我们不能“逃避”ReaderIO r 中的IO 操作。此行为特定于 ReaderIO,不能推广到所有 MonadIOs。
    • @DannyNavarro 不是孤立的。我的意思是merge :: (MonadIO m) =&gt; (m () -&gt; IO ()) -&gt; [Producer a m ()] -&gt; Producer a m ()
    【解决方案3】:

    不幸的是,您不能将ProducerMonadIO 基本单子(或任何MonadIO 计算)分叉。在 fork 计算之前,您需要特别包含运行所有其他 monad 转换器以获取 IO 操作所需的逻辑。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-05-04
      • 2017-02-21
      • 2019-12-07
      • 2012-03-25
      • 1970-01-01
      • 2023-03-06
      • 2023-03-26
      • 1970-01-01
      相关资源
      最近更新 更多