【问题标题】:Is there an Iteratee-like concept which pulls data from multiple sources?是否有类似 Iteratee 的概念从多个来源提取数据?
【发布时间】:2012-09-11 21:02:23
【问题描述】:

可以使用流(惰性列表)从多个(为简单起见说两个)源中按需提取。迭代器可用于处理来自单一来源的数据。

是否有类似 Iteratee 的功能概念来处理多个输入源?我可以想象一个 Iteratee 的状态信号,它想从哪个源提取。

【问题讨论】:

  • 所以你有两个来源,你想从哪个最先获得事件?
  • 从多个来源中提取的任何方式都可能等同于将两个来源合并为一个的某种方式。有很多方法可以做到这一点;你能澄清一下你在寻找什么行为吗?
  • 我想到了一些通用的东西,接收器可以根据状态控制要拉哪个源。具体的场景是合并两个排序的流,所以我得到元素的键相等的元组,并且不成对的元素被丢弃。
  • 任何 iteratee 库都可以通过简单地堆叠 monad 来做到这一点。奥列格首先展示了如何(okmij.org/ftp/Streams.html#2enum1iter)。我已经将它与我自己的 iteratee 包一起使用。它适用于其中任何一个。
  • @John:谢谢!顺便说一句,我记得我已经听过“奥列格先出现”这句话:)

标签: scala haskell functional-programming iterate


【解决方案1】:

要使用管道来做到这一点,您将 Pipe monad 转换器嵌套在其自身中,对于您希望与之交互的每个生产者一次。例如:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

就像多个变量的 Haskell 柯里化函数一样,您可以使用组合和 runPipe 将其部分应用于每个源:

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

上述函数运行时输出:

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

此技巧适用于让或等待上游或下游任意数量的管道。它也适用于代理,即管道的双向类似物。

编辑:请注意,这也适用于任何迭代库,而不仅仅是pipes。事实上,John Milikin 和 Oleg 是这种方法的最初倡导者,我只是从他们那里窃取了这个想法。

【讨论】:

  • 我只想指出,这适用于所有其他 iteratee 包,而不仅仅是管道。
【解决方案2】:

我们在 Scala 中使用 Machines 来获取不只是两个,而是任意数量的源。

库本身在Tee 模块上提供了两个二元连接示例:mergeOuterJoinhashJoinhashJoin 的代码如下所示(假设两个流都已排序):

/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

此代码构建了一个Plan,它使用repeatedly 方法“编译”为Machine。这里构建的类型是Tee[A, B, (A, B)],它是一台有两个输入的机器。你用awaits(left)awaits(right)请求左右输入,用emit输出。

还有一个Haskell version of Machines

【讨论】:

    【解决方案3】:

    Conduits(它可以为 Pipes 构建,但该代码尚未发布)有一个 zip 原语,它接受两个上游并将它们组合为一个元组流。

    【讨论】:

      【解决方案4】:

      查看pipes libraryvertical concatenation 可能会满足您的需求。例如,

      import Control.Pipe
      import Control.Monad
      import Control.Monad.State
      import Data.Void
      
      source0, source1 :: Producer Char IO ()
      source0 = mapM_ yield "say"
      source1 = mapM_ yield "what"
      
      sink :: Show b => Consumer b IO ()
      sink = forever $ await >>= \x -> lift $ print x
      
      pipeline :: Pipe () Void IO ()
      pipeline = sink <+< (source0 >> source1)
      

      排序运算符(&gt;&gt;) 垂直连接源,产生输出(在runPipe 上)

      's'
      'a'
      'y'
      'w'
      'h'
      'a'
      't'
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-06-06
        • 2012-07-26
        • 2020-11-25
        • 1970-01-01
        • 2023-04-04
        • 1970-01-01
        相关资源
        最近更新 更多