【问题标题】:RxJS SchedulersRxJS 调度程序
【发布时间】:2021-02-01 18:07:36
【问题描述】:

我正在一个简单的 NodeJS Express 服务器中使用 RxJS 做一些简单的实验,我正在比较处理和处理请求的不同方法(基于这篇文章 https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/)。这是基本设置:

const express = require('express')
const crypto = require('crypto')
const { asyncScheduler, asapScheduler, range } = require('rxjs')
const { promisify } = require('util')

const setImmediatePromise = promisify(setImmediate)

const PID = process.pid

function log(msg) {
    console.log(`[${PID}]`, new Date(), msg)
}

const app = express()

function randomString() {
    return crypto.randomBytes(100).toString('hex')
}

app.get('/compute-sync', function computeSync(req, res) {
    log('computing sync!')
    const hash = crypto.createHash('sha256')
    for (let i = 0; i < 1e6; i++) {
        hash.update(randomString())
    }
    res.send(hash.digest('hex') + '\n')
})

app.get('/compute-immediate', function computeImmediate(req, res) {
    log('computing immediate!')

    const hash = crypto.createHash('sha256')

    for (let i = 0; i < 1e6; i++) {
        await setImmediatePromise(hash.update, randomString())
    }

    res.send(hash.digest('hex') + '\n')
})

app.get('/compute-rxjs', async function computeRxjs(req, res) {
    log('computing Rxjs!')

    const hash = crypto.createHash('sha256')

    range(0, 1e6, asapScheduler).subscribe({
        next() {
            hash.update(randomString())
        },
        complete() {
            res.send(hash.digest('hex') + '\n')
        },
    })
})

app.get('/healthcheck', function healthcheck(req, res) {
    log('they check my health')
    res.send('all good!\n')
})

const PORT = process.env.PORT || 1337
let server = app.listen(PORT, () => log('server listening on :' + PORT))

据我了解,asapScheduler 会在后台使用 setImmediate,那么为什么 /compute-immediate 端点 NOT 会阻止事件循环(使服务器对新请求保持响应)但是/compute-rxjs 是否会阻塞并导致运行状况端点上的服务器超时?

我也尝试过asyncScheduler - 这不会阻塞,但它可能需要比/compute-immediate 端点长一个数量级才能完成。

我真的很想使用 RxJS 对传入请求进行更复杂的处理,但我觉得这个问题让这个选择变得不可取。有什么我想念的吗?有没有办法让 RxJS 解决方案以与 setImmediate 解决方案相同的方式工作?

【问题讨论】:

  • 它不使用setImmediate,而是Promise.then。由于在微任务队列为空之前它不会进入下一个滴答,我认为这就是它阻塞的原因。
  • computeRxjs 是异步的,而其余的都不是?我会避免混淆承诺和可观察的。他们可以一起玩得很好,但我已经看到了足够多令人震惊的角落案例,除非必要,否则避免混合(图书馆需要一个或另一个)。

标签: node.js express rxjs


【解决方案1】:

感谢您的回复!

我现在有了一个基于此要点的解决方案:https://gist.github.com/neilk/5380684

/compute-rxjs 现在看起来像这样:

app.get('/compute-rxjs', function computeRxjs(req, res) {
    log('computing Rxjs!')

    const hash = crypto.createHash('sha256')

    new Observable(subscriber => {
        ;(iter = (i = 0, max = 1e6) => {
            if (i === max) return subscriber.complete()

            subscriber.next(i)
            return setImmediate(iter, i + 1, max)
        })()
    }).subscribe({
        next() {
            hash.update(randomString())
        },
        complete() {
            res.send(hash.digest('hex') + '\n')
        },
    })
})

它的行为似乎完全符合我的要求(递归 - 谁知道?) - 它不会阻塞事件循环,并且运行时间与 /compute-immediate 端点相同,但给了我使用的灵活性RxJS 管道功能。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-05-07
    • 1970-01-01
    相关资源
    最近更新 更多