【发布时间】:2021-10-21 13:19:33
【问题描述】:
我正在尝试编写一个程序来抓取文件,进行更改,然后将它们保存在其他地方。
所以我的程序看起来像这样:
def processFiles: Stream[F, Unit] =
getFilePaths.flatMap(path =>
downloadFile(path)
.through(scanForViruses)
.through(encrypt)
.through(saveElsewhere)
)
def downloadFile(path: Path): Stream[F, Byte]
def scanForViruses: Pipe[F, Byte, Byte]
def encrypt: Pipe[F, Byte, Byte]
def saveElsewhere: Pipe[F, Byte, Unit]
其中一些步骤可能会失败,并且使用上面的代码,单个失败的文件会阻止其他文件的处理。所以我需要添加一些处理:
getFilePaths.flatMap(path =>
downloadFile(path)
.handleErrorsWith(e => log.error("failed at download", e) >> Stream.empty)
.through(scanForViruses)
.handleErrorsWith(e => log.error("failed at scan", e) >> Stream.empty)
.through(encrypt)
.handleErrorsWith(e => log.error("failed at encrypt", e) >> Stream.empty)
.through(saveElsewhere)
.handleErrorsWith(e => log.error("failed at save", e) >> Stream.empty)
)
这样可以正确捕获错误,但是异常之前的流中的所有元素仍然通过
例如,如果第一个文件流如下所示: Stream(Byte1, Byte2, Exception) 那么 handleErrorWith 将返回 Stream(Byte1, Byte2) 这对于文件没有意义。
另一方面,如果我 rethrow 异常(或不理会它),我会得到我想要的整体行为:它使该特定文件的流失败。但是每个错误处理程序都会在异常上被调用,我没有关于它实际失败的信息:
getFilePaths.flatMap(path =>
downloadFile(path).attempt.evalMap(handler1).rethrow
.through(scanForViruses).attempt.evalMap(handler2).rethrow
.through(encrypt).attempt.evalMap(handler3).rethrow
.through(saveElsewhere).attempt.evalMap(handler4)
)
^^ 如果异常发生在downloadFile 中,上面将调用所有处理程序
有没有办法在第一个异常时使整个流失败,并跟踪失败的位置,而不必每次执行步骤时都编译流?
【问题讨论】:
-
由于是流处理,当
nth 元素在步骤1 失败时,xth 步骤将处理n - xth 元素。这正是流应该做的事情。您需要的是不使用流处理。 -
虽然尝试为你做这件事,但你真的在运行 Stream 吗?或者,它是伪装成 Stream 的批次?