【发布时间】:2014-03-11 18:42:44
【问题描述】:
我正在使用Haskell pipes package。
我正在尝试使用pipes-concurrency 将生产者列表合并在一起。
我想要达到的是:
merge :: MonadIO m => [Producer a m ()] -> Producer a m ()
所以给定一个生产者 s1 和另一个生产者 s2:r = merge [s1, s2] 这将给出行为:
s1 --1--1--1--|
s2 ---2---2---2|
r --12-1-21--2|
按照我想出的教程页面中的代码:
mergeIO :: [Producer a IO ()] -> Producer a IO ()
mergeIO producers = do
(output, input) <- liftIO $ spawn Unbounded
_ <- liftIO $ mapM (fork output) producers
fromInput input
where
fork :: Output a -> Producer a IO () -> IO ()
fork output producer = void $ forkIO $ do runEffect $ producer >-> toOutput output
performGC
按预期工作。
但是我很难概括事物。
我的尝试:
merge :: (MonadIO m) => [Producer a m ()] -> Producer a m ()
merge producers = do
(output, input) <- liftIO $ spawn Unbounded
_ <- liftIO $ mapM (fork output) producers
fromInput input
where
runEffectIO :: Monad m => Effect m r -> IO (m r)
runEffectIO e = do
x <- evaluate $ runEffect e
return x
fork output producer = forkIO $ do runEffectIO $ producer >-> toOutput output
performGC
不幸的是,这可以编译,但并没有做太多其他事情。我猜我把runEffectIO弄得一团糟。我目前的runEffectIO 的其他方法没有产生更好的结果。
程序:
main = do
let producer = merge [repeater 1 (100 * 1000), repeater 2 (150 * 1000)]
_ <- runEffect $ producer >-> taker 20
where repeater :: Int -> Int -> Producer Int IO r
repeater val delay = forever $ do
lift $ threadDelay delay
yield val
taker :: Int -> Consumer Int IO ()
taker 0 = return ()
taker n = do
val <- await
liftIO $ putStrLn $ "Taker " ++ show n ++ ": " ++ show val
taker $ n - 1
点击val <- await,但没有到达liftIO $ putStrLn,因此它不会产生任何输出。但是它没有挂起就可以正常退出。
当我用mergeIO 替换merge 时,程序运行时,我希望输出20 行。
【问题讨论】:
-
如果使用基于推送的管道会更好吗?即
>~>而不是>->?
标签: haskell haskell-pipes