首先,我假设您使用的是 akka-http-experimental 的 1.0 版,因为该 API 可能与之前的版本有所不同。
您的代码无法编译的原因是akka.stream.scaladsl.Source$.apply() 需要
scala.collection.immutable.Seq 而不是 scala.collection.mutable.Seq。
因此,您必须使用to[T] 方法将可变序列转换为不可变序列。
文档:akka.stream.scaladsl.Source
此外,如您所见,Source$.apply() 接受 ()=>Iterator[T],因此您也可以将 ()=>users.iterator 作为参数传递。
由于Sink.fold(...) 返回最后一个计算的表达式,你可以给一个空的Seq() 作为第一个参数,遍历users 并将元素附加到序列中,最后得到结果。
但是,可能有更好的解决方案可以创建一个Sink,它将每个评估的表达式放入Seq,但我找不到它。
以下代码有效。
import akka.actor._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source,Sink}
import scala.concurrent.ExecutionContext.Implicits.global
case class User(name:String)
object Main extends App{
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
val users = Seq(User("alice"),User("bob"),User("charlie"))
val sink = Sink.fold[Seq[User], User](Seq())(
(seq, elem) =>
{println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})
val src = Source(users.to[scala.collection.immutable.Seq])
// val src = Source(()=>users.iterator) // this also works
val fut = src.runWith(sink) // Future[Seq[User]]
fut.onSuccess({
case x=>{
println(s"result => ${x}")
}
})
}
上面代码的输出是
elem => User(alice) | seq => List()
elem => User(bob) | seq => List(User(alice))
elem => User(charlie) | seq => List(User(alice), User(bob))
result => List(User(alice), User(bob), User(charlie))