【问题标题】:Can't use reactivestream Subscriber with akka stream sources不能将反应流订阅者与 akka 流源一起使用
【发布时间】:2017-02-08 21:24:10
【问题描述】:

我正在尝试将响应流订阅者附加到 akka 源。

我的来源似乎可以与一个简单的接收器(如 foreach)一起正常工作 - 但如果我放入一个由订阅者制作的真正接收器,我什么也得不到。

我的上下文是:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.{Subscriber, Subscription}

implicit val system =  ActorSystem.create("test")
implicit val materializer = ActorMaterializer.create(system)

class PrintSubscriber extends Subscriber[String] {
  override def onError(t: Throwable): Unit = {}
  override def onSubscribe(s: Subscription): Unit = {}
  override def onComplete(): Unit = {}
  override def onNext(t: String): Unit = {
    println(t)
  }
}

我的测试用例是:

val subscriber = new PrintSubscriber()
val sink = Sink.fromSubscriber(subscriber)

val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc"))
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz"))
source1.to(sink).run()(materializer)
source2.runForeach(println)

我得到输出:

aaa
bbb
ccc

为什么我没有得到 xxx、yyy 和 zzz?

【问题讨论】:

    标签: akka-stream reactive-streams


    【解决方案1】:

    引用下面Subscriber 的 Reactive Streams 规范:

    在传递一个之后会收到一次 onSubscribe(Subscription) 调用 Publisher.subscribe(Subscriber) 的订阅者实例。没有进一步的 在 Subscription.request(long) 之前将收到通知 调用。

    看到某些项目流向订阅者,您可以做出的最小更改是

    override def onSubscribe(s: Subscription): Unit = {
      s.request(3)
    }
    

    但是,请记住,这不会使其完全符合 Reactive Streams specs。不太容易实现是 Akka-Streams 本身等高级工具包背后的主要原因。

    【讨论】:

      猜你喜欢
      • 2018-08-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-05-03
      相关资源
      最近更新 更多