【发布时间】:2021-06-29 18:16:23
【问题描述】:
我正在尝试构建内容可寻址的文件存储。该过程很简单:获取一个字节流并将其写入临时位置,同时计算流内容的哈希,然后一旦流完成,将完全写入的临时对象移动到基于该哈希的最终位置.
基本上像这个 Conduit 示例,但比支持它的文件系统更强大:
storeObject dataDir srcStream =
let
sinks = liftA2 (,)
(ZipSink (sinkTempFile (dataDir </> "tmp") "ftmp"))
(ZipSink sinkHash)
in do
(tempfile, hash) <- runConduitRes (srcStream .| getZipSink sinks)
renameFile tempfile (dataDir </> "data" </> unpack (convert hash))
return (convert (hash :: Digest SHA256))
对于 fs2,我可以在分叉流 (How do I "split" a stream in fs2?) 上找到的最佳答案让我想到:
def zipPipes[F[_]: Functor: Concurrent, A, B, C]
(p1: Pipe[F, A, B], p2: Pipe[F, A, C]):
Pipe[F, A, (B, C)] = stream =>
Stream.eval(for {
q <- Queue.noneTerminated[F, A]
} yield {
stream
.evalTap(a => q.enqueue1(Some(a)))
.onFinalize(q.enqueue1(None))
.through(p1)
.zip(q.dequeue.through(p2))
}
).flatten[F, (B, C)]
(免责声明:我没有验证上面的代码除了编译之外还有什么作用)
但我不知道,这堆管道看起来很笨拙,以至于我觉得我错过了一个明显的替代方案?
【问题讨论】:
-
一个字节流就是一个文件?
-
是的,我知道文件是在流耗尽时写入的。