【发布时间】:2015-10-19 14:34:09
【问题描述】:
我正在寻找一个关于如何使用响应式扩展 (RxPY) 和 Twisted 的非常基本的示例。这是一个使用 Twisted 流式传输消息的最小 hello 应用程序。
def hello():
print 'Hello from the reactor loop!'
print 'Lately I feel like I\'m stuck in a rut.'
from twisted.internet import reactor
reactor.callWhenRunning(hello)
print 'Starting the reactor.'
reactor.run()
我想使用 RxPY 库来挂钩这些流(如果这样更容易,它们不必打印到屏幕上),并执行映射、过滤等规范操作...
我能找到的所有 RxPY 示例都可以生成它们自己的流,例如从可迭代的,以下代码流整数 0-9:
xs = Observable.from_(range(10))
xs.map(
lambda x: x * 2
).subscribe(print)
或者包含在更复杂的示例中(例如子类化 WebSocket 处理程序)。知道如何拦截打印消息吗? EG,从 Twisted reactor 生成一个 observables 流?
【问题讨论】:
标签: python reactive-programming rx-py