【问题标题】:How to delay event emission with rxpy/rxjs?如何使用 rxpy/rxjs 延迟事件发射?
【发布时间】:2018-05-17 15:37:24
【问题描述】:

我有两个事件流。一个来自电感回路,另一个来自 IP 摄像机。汽车将驶过环路,然后撞上相机。如果事件在 N 毫秒内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(任何一个硬件都可能失败)都合并到一个流中。像这样的:

           ---> (only unmatched a's, None)
         /                                  \
stream_a (loop)                              \
         \                                    \
            --> (a, b) ---------------------------> (Maybe a, Maybe b)
         /                                    /
stream_b  (camera)                           /
         \                                  /
            --> (None, only unmatched b's)

现在我当然可以通过良好的 ole Subject 反模式来破解我的方法:

unmatched_a = Subject()

def noop():
    pass

pending_as = [[]]

def handle_unmatched(a):
    if a in pending_as[0]:
        pending_as[0].remove(a)
        print("unmatched a!")
        unmatched_a.on_next((a, None))

def handle_a(a):
    pending_as[0].append(a)
    t = threading.Timer(some_timeout, handle_unmatched)
    t.start()
    return a

def handle_b(b):
    if len(pending_as[0]):
        a = pending_as[0].pop(0)
        return (a, b)

    else:
        print("unmatched b!")
        return (None, b)

stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)

这不仅相当 hacky,而且虽然我没有观察到它,但我很确定当我使用 threading.Timer 检查待处理队列时存在竞争条件。鉴于过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用 Subject 的情况下执行此操作,但我无法弄清楚。如何做到这一点?

编辑

虽然出于组织和运营方面的原因,我更喜欢使用 Python,但我会采用 JavaScript rxjs 答案,然后将其移植,甚至可能在 node 中重写整个脚本。

【问题讨论】:

  • 你移植了吗?我问是因为 rxpy 没有像答案使用的 auditTime 这样的东西。
  • @MarcJ.Schmidt 不,我最终使用了主题、线程计时器和无操作订阅问题中描述的黑客。代码的长度是原来的十倍,复杂度是原来的三倍,但我和 ops 一起决定坚持使用 python。不过,接受的答案中描述的 node.js POC 效果很好。
  • 如果您在已接受答案的 cmets 中看到我与 Cartant 的对话,建议您在 Python 中简单地实现 auditTime。我花了一天的大部分时间来尝试做这件事,但我的 Rx/Python 能力不足以完成这项任务。

标签: python-3.x functional-programming rxjs reactive-programming rx-py


【解决方案1】:

您应该能够使用auditTimebuffer 解决问题。像这样:

function matchWithinTime(a$, b$, N) {
  const merged$ = Rx.Observable.merge(a$, b$);
  // Use auditTime to compose a closing notifier for the buffer.
  const audited$ = merged$.auditTime(N);
  // Buffer emissions within an audit and filter out empty buffers.
  return merged$
    .buffer(audited$)
    .filter(x => x.length > 0);
}

const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));

setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

如果 b 值可能紧跟在 a 值之后,并且您不希望它们匹配,则可以使用更具体的审核,如下所示:

const audited$ = merged$.audit(x => x === "a" ?
  // If an `a` was received, audit upcoming values for `N` milliseconds.
  Rx.Observable.timer(N) :
  // If a `b` was received, don't audit the upcoming values.
  Rx.Observable.of(0, Rx.Scheduler.asap)
);

【讨论】:

  • 对不起,我应该更清楚。 a 和 b 分别是来自电感回路和网络摄像机的输入。完全有可能任何一个硬件都可能发生故障,或者汽车行驶速度太慢以至于事件在时间上过于脱节(即毫秒超时),但它必须在到达相机之前进入循环,所以虽然它可能是 a 然后 b 或者只是 a 如果 b 失败或者只是 b 如果 a 失败,它永远不能在 b 后面跟着 a 除非汽车倒退。
  • 也许这更接近你的需要。
  • 这看起来很有希望。我会在早上的第一件事上试一试。 50 是未使用的 N 参数的示例吗?
  • 是的。修好了。
  • @Picci 我使用了auditTime,因为它实现了我想要的逻辑。审计在收到第一个值时开始,在持续时间结束时结束。然后,在收到下一个值之前,下一次审核不会开始——这就是我想要的。该行为在docs 中进行了解释。
【解决方案2】:

我开发了一种与 Cartant 不同的策略,而且显然没有那么优雅,这可能会给你带来不同的结果。如果我没有理解这个问题,如果我的回答被证明是无用的,我深表歉意。

我的策略是在 a$ 上使用 switchMap,然后在 b$ 上使用 bufferTime

这段代码在每个timeInterval 发出,它发出一个对象,其中包含最后收到的a 和代表b 的数组b strong>s 在时间间隔内收到。

a$.pipe(
    switchMap(a => {
        return b$.pipe(
            bufferTime(timeInterval),
            mergeMap(arrayOfB => of({a, arrayOfB})),
        )
    })
)

如果arrayOfB为空,则表示最后一个a不匹配。

如果arrayOfB只有一个元素,则意味着最后一个a已与数组的b匹配。

如果arrayOfB 有多个元素,则意味着最后一个 a 已与数组的第一个 b 匹配,而所有其他 b是无与伦比的。

现在的问题是避免发射相同的a 一次,这就是代码变得有点混乱的地方。

总而言之,代码可能如下所示

const a$ = new Subject();
const b$ = new Subject();

setTimeout(() => a$.next("a1"), 0);
setTimeout(() => b$.next("b1"), 0);
setTimeout(() => a$.next("a2"), 100);
setTimeout(() => b$.next("b2"), 125);
setTimeout(() => a$.next("a3"), 200);
setTimeout(() => b$.next("b3"), 275);
setTimeout(() => a$.next("a4"), 400);
setTimeout(() => b$.next("b4"), 425);
setTimeout(() => b$.next("b4.1"), 435);
setTimeout(() => a$.next("a5"), 500);
setTimeout(() => b$.next("b5"), 575);
setTimeout(() => b$.next("b6"), 700);
setTimeout(() => b$.next("b6.1"), 701);
setTimeout(() => b$.next("b6.2"), 702);
setTimeout(() => a$.next("a6"), 800);


setTimeout(() => a$.complete(), 1000);
setTimeout(() => b$.complete(), 1000);


let currentA;

a$.pipe(
    switchMap(a => {
        currentA = a;
        return b$.pipe(
            bufferTime(50),
            mergeMap(arrayOfB => {
                let aVal = currentA ? currentA : null;
                if (arrayOfB.length === 0) {
                    const ret = of({a: aVal, b: null})
                    currentA = null;
                    return ret;
                }
                if (arrayOfB.length === 1) {
                    const ret = of({a: aVal, b: arrayOfB[0]})
                    currentA = null;
                    return ret;
                }
                const ret = from(arrayOfB)
                            .pipe(
                                map((b, _indexB) => {
                                    aVal = _indexB > 0 ? null : aVal;
                                    return {a: aVal, b}
                                })
                            )
                currentA = null;
                return ret;
            }),
            filter(data => data.a !== null || data.b !== null)
        )
    })
)
.subscribe(console.log);

【讨论】:

  • 看看我的答案的编辑历史。 a 不能保证在 b 之前发生。我认为我的第一个编辑更接近你的。我稍后会回来解释我做了什么。目前很忙。
  • @cartant 你需要多少代表才能看到编辑历史?但是,是的,IIRC 这看起来很像您的第一次尝试。无论如何,谢谢 Picci,请 +1。
  • @JaredSmith 我会认为这是一个相当低的限制,你们可以看到它。只需单击我的答案下的“8 小时前编辑”标签。如果您展开第一个编辑,您应该会看到我的原始答案。
猜你喜欢
  • 1970-01-01
  • 2021-11-13
  • 1970-01-01
  • 2019-05-23
  • 2019-10-08
  • 1970-01-01
  • 2021-09-25
  • 1970-01-01
  • 2018-10-17
相关资源
最近更新 更多