【问题标题】:Observable piped through `share()` invokes a single observer an unnecessary number of times通过 `share()` 管道的 Observable 调用单个观察者不必要的次数
【发布时间】:2018-09-19 20:14:51
【问题描述】:

我正在尝试测试通过共享/热 observable 发送的消息的延迟。我注意到当我在单个共享 observable 上有多个观察者时,单个观察者会从单个消息中调用 n 次(其中 n 是共享 observable 上的观察者数量)。

我使用 10 个观察者每个观察者 1 条消息 运行下面的代码,每个观察者在每条消息中被调用 10 次(意味着总共 100 次observer.next() 调用)。根据我对观察者/可观察者的理解,每个观察者应该每条消息只调用一次。我只是在这里错误地使用了share() 运算符吗?还是我对它的理解总体上存在缺陷?

const getMessageLatency = (observersCount, messagesPerObserver) => {
    const completedMessages = [];
    const source = new Subject();
    const sharedObservable = source.pipe(
        tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
        share()
    );

    // Setup observers
    for (i = 0; i < observersCount; ++i) {
        sharedObservable
        .pipe(
            tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
            filter((message) => message.id === getObserverId(i)),
            tap(() => console.log(`Filtered for ${getObserverId(i)}`))
        )
        .subscribe((message) => {
            const date = new Date();
            message.endTime = date.getMilliseconds();
            completedMessages.push(message);
        })
    }

    // send out messages
    for (i = 0; i < observersCount; ++i) {
        for (j = 0; j < messagesPerObserver; ++j) {
            const date = new Date();
            const message = {
                id: getObserverId(i),
                startTime: date.getMilliseconds()
            }

            // send message
            source.next(message);
        }
    }

    // process data (get average message latency)
    const totalMessageLatency = completedMessages.reduce(
        (accumulatedLatency, currentMessage) => {
            const currentMessageLatency = 
                currentMessage.endTime - currentMessage.startTime;
            return accumulatedLatency + currentMessageLatency;
        }, 0);
    const averageLatency = totalMessageLatency / completedMessages.length;

    console.log("==============================================================================");
    console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
    console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
    console.log(`Total Messages Received: ${completedMessages.length}`);
    console.log(`Average Latency per Message: ${averageLatency}`);
    console.log("==============================================================================");

    return averageLatency;
}

运行完成后,如果“发送的消息总数”为 x,则“接收的消息总数”将为 x^2

【问题讨论】:

标签: javascript rxjs observable reactivex rxjs-pipeable-operators


【解决方案1】:

在我的 for 循环声明中添加了 let。你们都可以说我也是 JavaScript 新手。

谢谢你

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-23
    • 2021-04-13
    相关资源
    最近更新 更多