【问题标题】:RxJS Node wait for observable to complete before subscribing to the next in sequenceRxJS 节点等待 observable 完成,然后再按顺序订阅下一个
【发布时间】:2016-12-02 19:51:47
【问题描述】:

我正在尝试使用从谷歌表格中抓取的值填充我的数据库。 hasuraHelper.insert() 将 HTTP 发布到我的数据库。如何使 insert() 仅在上一个 api 调用返回后才被调用?我认为 concatMap 可以解决问题,但它似乎仍然急切地订阅所有排放。

当前:请求 1 -> 请求 2 -> ... -> 请求 1 完成 -> 请求 2 完成 -> ...

我想要什么:请求 1 -> 请求 1 完成 -> 请求 2 -> 请求 2 完成 -> ...

下面是相关代码:

sheetsHelper.authToken()
    .flatMap(sheetsHelper.get)
    .flatMap(response => response.values) //values is a 2d array
    .map(row => {
        console.log(row[0]);
        const flights = [];
        //add 100-500 objects to this array
        return flights;
    })
    .flatMap(flights => {
        const arrayOfFlightArrays = [];
        //max batch size of 50
        while (flights.length) {
            const flightArr = flights.splice(0, 50);
            arrayOfFlightArrays.push(flightArr);
        }
        return Rx.Observable.from(arrayOfFlightArrays);
    })
    .concatMap(flights => hasuraHelper.insert(flights) //insert flights into table
        .retry()) 
    .map(response => response.data)
    .subscribe(console.log, console.error);

这是插入的样子:

exports.insert = function (objects) {
    return Rx.Observable.fromPromise(axios({
        method: 'post',
        url: '/',
        data: {
            type: "insert",
            args: {
                table: "flights",
                objects: objects
            }
        }
    }));
};

【问题讨论】:

    标签: javascript node.js rxjs


    【解决方案1】:

    您的插入函数并不懒惰。你在 Observable 中封装了一个正在执行的 Promise,但它已经在执行了。

    要获得一个承诺变得懒惰并在订阅时开始处理,您需要将其包装在Rx.Observable.defer(() => Rx.Observable.fromPromise(axios(/* ... */))

    【讨论】:

    • 嗯,所以 concatMap 会尽快消耗排放量来创建可观察对象,但只有在前一个完成后才订阅它们?
    • 正确!大多数时候这不是问题,因为常见的模式是 Observable 是冷的(=在订阅之前不做事)。
    猜你喜欢
    • 2021-11-16
    • 2019-09-02
    • 2021-08-28
    • 1970-01-01
    • 2019-08-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-21
    相关资源
    最近更新 更多