【发布时间】:2018-05-17 15:37:24
【问题描述】:
我有两个事件流。一个来自电感回路,另一个来自 IP 摄像机。汽车将驶过环路,然后撞上相机。如果事件在 N 毫秒内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(任何一个硬件都可能失败)都合并到一个流中。像这样的:
---> (only unmatched a's, None)
/ \
stream_a (loop) \
\ \
--> (a, b) ---------------------------> (Maybe a, Maybe b)
/ /
stream_b (camera) /
\ /
--> (None, only unmatched b's)
现在我当然可以通过良好的 ole Subject 反模式来破解我的方法:
unmatched_a = Subject()
def noop():
pass
pending_as = [[]]
def handle_unmatched(a):
if a in pending_as[0]:
pending_as[0].remove(a)
print("unmatched a!")
unmatched_a.on_next((a, None))
def handle_a(a):
pending_as[0].append(a)
t = threading.Timer(some_timeout, handle_unmatched)
t.start()
return a
def handle_b(b):
if len(pending_as[0]):
a = pending_as[0].pop(0)
return (a, b)
else:
print("unmatched b!")
return (None, b)
stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)
这不仅相当 hacky,而且虽然我没有观察到它,但我很确定当我使用 threading.Timer 检查待处理队列时存在竞争条件。鉴于过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用 Subject 的情况下执行此操作,但我无法弄清楚。如何做到这一点?
编辑
虽然出于组织和运营方面的原因,我更喜欢使用 Python,但我会采用 JavaScript rxjs 答案,然后将其移植,甚至可能在 node 中重写整个脚本。
【问题讨论】:
-
你移植了吗?我问是因为 rxpy 没有像答案使用的
auditTime这样的东西。 -
@MarcJ.Schmidt 不,我最终使用了主题、线程计时器和无操作订阅问题中描述的黑客。代码的长度是原来的十倍,复杂度是原来的三倍,但我和 ops 一起决定坚持使用 python。不过,接受的答案中描述的 node.js POC 效果很好。
-
如果您在已接受答案的 cmets 中看到我与 Cartant 的对话,建议您在 Python 中简单地实现 auditTime。我花了一天的大部分时间来尝试做这件事,但我的 Rx/Python 能力不足以完成这项任务。
标签: python-3.x functional-programming rxjs reactive-programming rx-py