【问题标题】:What's the best way to run an fs2 stream through multiple pipes, then combine the results?通过多个管道运行 fs2 流然后组合结果的最佳方法是什么?
【发布时间】:2021-06-29 18:16:23
【问题描述】:

我正在尝试构建内容可寻址的文件存储。该过程很简单:获取一个字节流并将其写入临时位置,同时计算流内容的哈希,然后一旦流完成,将完全写入的临时对象移动到基于该哈希的最终位置.

基本上像这个 Conduit 示例,但比支持它的文件系统更强大:

storeObject dataDir srcStream = 
  let
     sinks = liftA2 (,)
       (ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
       (ZipSink sinkHash)
  in do
      (tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
      renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
      return (convert (hash :: Digest SHA256))

对于 fs2,我可以在分叉流 (How do I "split" a stream in fs2?) 上找到的最佳答案让我想到:

  def zipPipes[F[_]: Functor: Concurrent, A, B, C]
   (p1: Pipe[F, A, B], p2: Pipe[F, A, C]): 
   Pipe[F, A, (B, C)] = stream => 
      Stream.eval(for {
        q <- Queue.noneTerminated[F, A]
        } yield {
          stream
            .evalTap(a => q.enqueue1(Some(a)))
            .onFinalize(q.enqueue1(None))
            .through(p1)
            .zip(q.dequeue.through(p2))
        }
      ).flatten[F, (B, C)]

(免责声明:我没有验证上面的代码除了编译之外还有什么作用)

但我不知道,这堆管道看起来很笨拙,以至于我觉得我错过了一个明显的替代方案?

【问题讨论】:

  • 一个字节流就是一个文件?
  • 是的,我知道文件是在流耗尽时写入的。

标签: scala fs2


【解决方案1】:

你并没有真正错过任何东西。您可以与Topic 共享流,例如:

def shareN[F[_]: Concurrent, A](n: Int): fs2.Pipe[F, A, List[fs2.Stream[F, A]]] = { src =>
  fs2.Stream.eval(Topic[F, A]).flatMap { topic =>
    fs2.Stream(List.fill(n)(topic.subscribe(1))).concurrently(
      topic.subscribers.find(_ == n) >> topic.publish(src)
    )
  }
}

会给你一个固定大小的列表,限制你必须并行消费所有东西,否则你会死锁。这可以通过 shapeless 使类型更安全,但这是一个不同的问题。

也很有可能您可以将哈希值敲入fs2.Stream#mapAccumulate 并得到一个元组。

【讨论】:

    猜你喜欢
    • 2023-03-03
    • 2023-03-21
    • 1970-01-01
    • 2018-08-04
    • 1970-01-01
    • 2023-03-28
    • 2018-07-21
    • 2015-10-01
    • 1970-01-01
    相关资源
    最近更新 更多