【问题标题】:Parallel processing in conduit flow管道流中的并行处理
【发布时间】:2014-11-04 18:23:50
【问题描述】:

我非常喜欢用于将操作应用于流式 IO 源的管道/管道概念。我对构建适用于非常大的日志文件的工具很感兴趣。从 Python/Ruby 迁移到 Haskell 的吸引力之一是编写并行代码的更简单方法,但我找不到任何相关文档。我如何设置一个从文件中读取行并并行处理它们的管道流(即,使用 8 个内核,它应该读取 8 行,并将它们交给八个不同的线程进行处理,然后再次收集等),理想情况下,尽可能少的“仪式”......

如果可能会影响流程的速度,可以选择是否需要按顺序重新连接线路?

我确信可以使用 Parallel Haskell 书中的想法自己拼凑一些东西,但在我看来,在 Conduit 工作流程中间并行运行纯函数(parmap 等)应该非常容易?

【问题讨论】:

  • 一般来说,管道(和管道)的概念被设计成顺序的,而不是平行的。特别是,当一个管道请求输入时,只有它的上游管道才运行以产生一个值。所以没有什么可以并行化的。您可以做的是创建一个内部并行化的管道 - 接收输入,调度任务以处理它们并产生它们的输出。
  • 我只是觉得很奇怪,因为很多 Haskell 的支持者都提到了多核处理器日益重要的重要性,以及 Haskell 作为一种纯函数式语言在这方面的真正亮点。每当你谈到在 Haskell 中处理大量数据时,都会提到管道和管道是最好的框架。所以我会认为在管道/导管中并行处理应该是“微不足道的”......
  • 有两个不同的概念:concurrency and parallelism(我应该在我的第一条评论中更清楚地说明这个区别)。并行性在纯语言中是很自然的,它不依赖于任何框架,您只需启动几个 spark 即可在多个内核上计算纯函数,例如管道中的 fpr。另一方面,并​​发(多线程)是显式且不确定的,并且在概念上与管道/管道不同。

标签: haskell conduit


【解决方案1】:

作为 Petr Pudlák 在他的评论中提到的“内部并行性”的一个例子,考虑这个函数(我正在使用 pipes,但可以很容易地用 conduit 实现):

import Control.Monad
import Control.Lens (view)
import Control.Concurrent.Async (mapConcurrently)
import Pipes
import qualified Pipes.Group as G
import qualified Control.Foldl as L

concProd :: Int -> (a -> IO b) -> Producer a IO r -> Producer b IO r
concProd groupsize action producer = 
      L.purely G.folds L.list (view (G.chunksOf groupsize) producer)
      >->
      forever (await >>= liftIO . mapConcurrently action >>= mapM G.yield) 

此函数将组大小、我们要为a 类型的每个值运行的操作以及Producera 值作为参数。

它返回一个新的Producer。在内部,生产者分批读取agroupsize,并发处理,并一一产生结果。

代码使用Pipes.Group将原始生产者“划分”为大小为groupsize的子生产者,然后Control.Foldl将每个子生产者“折叠”成一个列表。

对于更复杂的任务,您可以使用pipes-concurrencystm-conduit 提供的异步通道。但这些让你在某种程度上摆脱了香草管道/导管的“单一管道”世界观。

【讨论】:

  • 这很漂亮。非常感谢!我一直在考虑这个问题,似乎 Pipes.Group 提供了一种自然的方式来将管道流分组为块并并行处理它们。我从来没有想过使用purely folds list 创建块。确实很不错!
猜你喜欢
  • 2016-02-26
  • 2017-07-20
  • 2017-12-31
  • 2017-07-16
  • 1970-01-01
  • 1970-01-01
  • 2012-10-03
  • 2019-05-18
  • 2017-07-07
相关资源
最近更新 更多