【问题标题】:Fs2 how to fold then append to a StreamFs2如何折叠然后附加到流
【发布时间】:2017-11-21 17:17:45
【问题描述】:

我想追加到Stream

但是下一个流依赖于之前Stream的折叠结果

这是我的做法,但 Stream s 被评估了两次

scasite link

import fs2._

def ints(start: Int) = Stream.iterate(start) { i => 
  println(i)
  i + 1
}.take(10)

val s = ints(0)

def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[Pure, Int]) = {
  s ++ s.fold(init)(f).flatMap(next)
}

val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.toList)

如何实现只评估一次 sfoldAppend 方法。

【问题讨论】:

  • 你确定你写的不是def s而不是val s吗?就目前而言,我不确定是否有足够的信息来回答这个问题。一个完整的例子会很有帮助,这意味着 s 的完整定义。
  • @BrianMcCutchon 我已经添加了 scasite 链接

标签: scala fs2


【解决方案1】:

Brian 的回答是错误的,s 实际上是懒惰的,所以整个流被评估了两次。绑定到s 的变量是严格的,但fs2 中的Stream 是一个惰性流,只有在run 时才会对其进行评估。

您的主要问题是 Pure 不是用于安全实现副作用的 monad,例如 IO。你不应该在纯粹的println。一个有效的例子是:

import cats.effect.IO
import fs2._

def ints(start: Int) = Stream.iterate(start) { i => println(i)
  i + 1
}.take(10)

val s = ints(0)

def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[IO, Int]) = {

  val result = s.covary[IO].runLog
  Stream.eval(result).covary[IO].flatMap {
    s =>
      Stream.emits(s) ++ Stream.emits(s).fold(init)(f).flatMap(next)
  }
}
val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.runLast.unsafeRunSync())

这将评估一次流

【讨论】:

  • 我使用println 只是表示流被评估了两次
  • 这个方案其实是先把所有数据存入内存,然后再发送到一个新的Stream,恐怕无法扩展。
【解决方案2】:

终于用Pull完成工作

implicit class StreamSyntax[F[_], A](s: Stream[F, A]) {
    def foldAppend[S](init: S)(f: (S, A) => S)(next: S => Stream[F, A]): Stream[F, A] = {

      def pullAll(s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
        s.pull.unconsChunk.flatMap {
          case Some((hd, tl)) =>
            Pull.output(hd) *> pullAll(tl)
          case None =>
            Pull.pure(None)
        }
      }

      def foldChunks(i: S, s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
        s.pull.unconsChunk.flatMap {
          case Some((hd, tl)) =>
            val sum: S = hd.toVector.foldLeft(i)(f)
            Pull.output(hd) *> foldChunks(sum, tl)
          case None =>
            pullAll(next(i))
        }
      }
      foldChunks(init, s).stream
    }
  }

【讨论】:

    【解决方案3】:

    您是否考虑过使用scala.collection.immutable.Stream?它已缓存,因此不会被多次评估。

    【讨论】:

      猜你喜欢
      • 2019-11-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-06-14
      • 1970-01-01
      • 1970-01-01
      • 2018-08-28
      相关资源
      最近更新 更多