【问题标题】:Reactive Programming using RxScala使用 RxScala 的反应式编程
【发布时间】:2015-07-29 23:11:32
【问题描述】:

我有一个通过 Socket 协议连接到服务的 Observable。与套接字的连接是通过客户端库发生的。我使用的客户端库有 java.util.Observer,我可以注册推送到其中的事件

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

我有两个未解决的问题,我不明白。

如何在我的订阅者中获得步骤 3 的结果?

每次当我收到 MyEvent 时,订阅者如下所示,我看到正在创建一个新连接。最终,每个传入事件都会运行第 1 步、第 2 步和第 3 步。

val myObservable = new MyObservale()
myObservable.subscribe()

【问题讨论】:

  • 你的Subscriber在哪里?你能用myObservable.subscribe(mySubscriber)吗?
  • 但是如何将MyObservable类的subscribe方法中Step:3的结果推送到外界呢?
  • 是否有值得考虑的 RxScala 库? Netflix 的 RxScala 怎么样?是否启用了背压?
  • 你不是已经在使用 RxScala 了吗?
  • 我正在使用 monifu 库,这是一个受 Rx.NET 启发的实现。 monifu 图书馆有很多好东西,目前对我来说有点难以掌握!

标签: scala system.reactive rx-java rx-scala


【解决方案1】:

除非我误解了您的问题,否则您只需拨打onNext

def subscribe(subscriber: Subscriber[MyEvent]) = {
  // connect to the Socket (Step: 1)
  // get the responses that are pushed (Step: 2)
  // transform them into MyEvent type (Step: 3)

  // finally notify the subscriber:
  subscriber.onNext(myEventFromStep3)
}

订阅的代码会执行以下操作:

myObservable.subscribe(onNext = println(_))

【讨论】:

    猜你喜欢
    • 2019-03-16
    • 2016-06-29
    • 2020-05-13
    • 2015-07-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多