【问题标题】:Scala Akka Stream: How to Pass Through a SeqScala Akka Stream:如何通过 Seq
【发布时间】:2015-09-10 07:17:46
【问题描述】:

我试图在Future 中封装一些阻塞调用。返回类型是Seq[User],其中Usercase class。以下内容不会与存在各种重载版本的投诉一起编译。有什么建议么?我尝试了几乎所有变体都是Source.apply,但没有任何运气。

// All I want is Seq[User] => Future[Seq[User]]

def findByFirstName(firstName: String) = {
  val users: Seq[User] = userRepository.findByFirstName(firstName)

  val sink = Sink.fold[User, User](null)((_, elem) => elem)

  val src = Source(users) // doesn't compile

  src.runWith(sink)
}

【问题讨论】:

    标签: scala akka akka-stream reactive-streams


    【解决方案1】:

    首先,我假设您使用的是 akka-http-experimental 的 1.0 版,因为该 API 可能与之前的版本有所不同。

    您的代码无法编译的原因是akka.stream.scaladsl.Source$.apply() 需要 scala.collection.immutable.Seq 而不是 scala.collection.mutable.Seq

    因此,您必须使用to[T] 方法将可变序列转换为不可变序列。

    文档:akka.stream.scaladsl.Source

    此外,如您所见,Source$.apply() 接受 ()=>Iterator[T],因此您也可以将 ()=>users.iterator 作为参数传递。

    由于Sink.fold(...) 返回最后一个计算的表达式,你可以给一个空的Seq() 作为第一个参数,遍历users 并将元素附加到序列中,最后得到结果。

    但是,可能有更好的解决方案可以创建一个Sink,它将每个评估的表达式放入Seq,但我找不到它。

    以下代码有效。

    import akka.actor._
    import akka.stream.ActorMaterializer
    import akka.stream.scaladsl.{Source,Sink}
    import scala.concurrent.ExecutionContext.Implicits.global
    
    case class User(name:String)
    
    object Main extends App{
        implicit val system = ActorSystem("MyActorSystem")
        implicit val materializer = ActorMaterializer()
        val users = Seq(User("alice"),User("bob"),User("charlie"))
    
        val sink = Sink.fold[Seq[User], User](Seq())(
          (seq, elem) => 
            {println(s"elem => ${elem} \t| seq => ${seq}");seq:+elem})
    
        val src = Source(users.to[scala.collection.immutable.Seq])
        // val src = Source(()=>users.iterator) // this also works
    
        val fut = src.runWith(sink) // Future[Seq[User]]
        fut.onSuccess({
          case x=>{
            println(s"result => ${x}")
          }
        })
    }
    

    上面代码的输出是

    elem => User(alice)     | seq => List()
    elem => User(bob)       | seq => List(User(alice))
    elem => User(charlie)   | seq => List(User(alice), User(bob))
    result => List(User(alice), User(bob), User(charlie))
    

    【讨论】:

    • 是的,我使用的是 1.0 版。你的答案正是我想要的。我一直觉得有些人只是在提出替代建议之前没有回答所提出的问题,这让我很开心。如果有什么安慰的话,显然我不是第一个遇到包对象 scala 中定义的 Seq 不是不可变版本的别名的情况的人。 Here's 一场涉及 Martin Odersky 的大讨论。
    【解决方案2】:

    如果你只需要 Future[Seq[Users]] 不要使用 akka 流但是 futures

    import scala.concurrent._
    import ExecutionContext.Implicits.global
    val session = socialNetwork.createSessionFor("user", credentials)
    val f: Future[List[Friend]] = Future {
      session.getFriends()
    }
    

    【讨论】:

    • Akka 如此精美地提供的并发控制、异常处理、日志记录在哪里?另外,我的资源使用akka-http,我已经有 Akka Streams,不妨使用它。
    • 使用Futures 似乎更适合userRepository 或任何其他存储库。原因是您的存储库代表某种查询机制,并将您的数据作为一个整体返回。然而,流应该用于数据以卡盘或更小部分进入并且可能无限提供的地方。
    • @VanyaStanislavciuc Reactive Streams 规范或 Akka Streams 中没有任何内容表明流需要是无限的或分块的。分块流是一个用例,而不是必需品。此外,即使我现在只是通过Seq,将来我很可能会添加一些处理逻辑。为未来设计总是更好,尤其是在工具已经可用的情况下。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    • 2022-08-23
    • 2017-07-14
    • 1970-01-01
    • 1970-01-01
    • 2020-08-14
    • 2018-02-17
    相关资源
    最近更新 更多