【问题标题】:Integrate reactive extensions and twisted, basic example?集成响应式扩展和扭曲的基本示例?
【发布时间】:2015-10-19 14:34:09
【问题描述】:

我正在寻找一个关于如何使用响应式扩展 (RxPY) 和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。

def hello():
    print 'Hello from the reactor loop!'
    print 'Lately I feel like I\'m stuck in a rut.'

from twisted.internet import reactor

reactor.callWhenRunning(hello)

print 'Starting the reactor.'
reactor.run()

我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作...

我能找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:

xs = Observable.from_(range(10))
xs.map(
    lambda x: x * 2
      ).subscribe(print)

或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗? EG,从 Twisted reactor 生成一个 observables 流?

【问题讨论】:

    标签: python reactive-programming rx-py


    【解决方案1】:

    我一直在试图弄清楚同样的事情。这是我的发现。

    启动 RX 数据流的最简单方法是创建 Subject(),它结合了 ObserverObservable。然后您可以使用on_next 方法将数据输入其中。

    Rx 部分:

    from rx.subjects import Subject
    subject=Subject()
    subject.filter(...).map(...).subscribe(my_observer)
    

    从你扭曲的回调中你只需这样做:

    subject.on_next(data_item)
    ...
    subject.on_completed()
    

    或者您可以发出错误信号:

    subject.on_error(my_error)
    

    在学习 RxPy 时,我写过 simple web server using both RxPy and Twisted。是单个python文件,可以作为例子。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多