【发布时间】:2018-09-19 20:14:51
【问题描述】:
我正在尝试测试通过共享/热 observable 发送的消息的延迟。我注意到当我在单个共享 observable 上有多个观察者时,单个观察者会从单个消息中调用 n 次(其中 n 是共享 observable 上的观察者数量)。
我使用 10 个观察者 和 每个观察者 1 条消息 运行下面的代码,每个观察者在每条消息中被调用 10 次(意味着总共 100 次observer.next() 调用)。根据我对观察者/可观察者的理解,每个观察者应该每条消息只调用一次。我只是在这里错误地使用了share() 运算符吗?还是我对它的理解总体上存在缺陷?
const getMessageLatency = (observersCount, messagesPerObserver) => {
const completedMessages = [];
const source = new Subject();
const sharedObservable = source.pipe(
tap((message) => console.log(`Subject: Incoming for ${message.id}`)),
share()
);
// Setup observers
for (i = 0; i < observersCount; ++i) {
sharedObservable
.pipe(
tap((message) => console.log(`SharedObservable: Incoming for ${message.id}`)),
filter((message) => message.id === getObserverId(i)),
tap(() => console.log(`Filtered for ${getObserverId(i)}`))
)
.subscribe((message) => {
const date = new Date();
message.endTime = date.getMilliseconds();
completedMessages.push(message);
})
}
// send out messages
for (i = 0; i < observersCount; ++i) {
for (j = 0; j < messagesPerObserver; ++j) {
const date = new Date();
const message = {
id: getObserverId(i),
startTime: date.getMilliseconds()
}
// send message
source.next(message);
}
}
// process data (get average message latency)
const totalMessageLatency = completedMessages.reduce(
(accumulatedLatency, currentMessage) => {
const currentMessageLatency =
currentMessage.endTime - currentMessage.startTime;
return accumulatedLatency + currentMessageLatency;
}, 0);
const averageLatency = totalMessageLatency / completedMessages.length;
console.log("==============================================================================");
console.log(`Observers: ${observersCount}, MessagesPerObserver: ${messagesPerObserver}`);
console.log(`Total Messages Sent: ${observersCount * messagesPerObserver}`);
console.log(`Total Messages Received: ${completedMessages.length}`);
console.log(`Average Latency per Message: ${averageLatency}`);
console.log("==============================================================================");
return averageLatency;
}
运行完成后,如果“发送的消息总数”为 x,则“接收的消息总数”将为 x^2
【问题讨论】:
-
我怀疑部分问题/误解与
for循环var有关。见stackoverflow.com/q/750486/6680611 -
嗯...谢谢!
-
另外,请注意,登录到控制台非常慢,而且毫秒数很粗 - 使用
performance.now() -
谢谢!我删除了测试期间发生的所有控制台日志。还切换到 performance.now() (或者至少是我能找到的 Node 等效项)。这是有问题的实际测试文件供参考:github.com/KeijiBranshi/npm-packages/blob/master/…
标签: javascript rxjs observable reactivex rxjs-pipeable-operators