【问题标题】:Fire async request in parallel but get result in order using rxjs并行触发异步请求,但使用 rxjs 按顺序获取结果
【发布时间】:2016-04-01 21:56:01
【问题描述】:

例如:

使用 jquery ajax 并行获取 5 个页面。当 page2 返回时,什么也不做。当 page1 返回时,对 page1 和 page2 做一些事情。

// assume there is some operator that can do this, 
// then it might look like this?
Rx.Observable.range(1, 5).
someOperator(function(page) {
  return Rx.Observable.defer( () => $.get(page) );
}).scan(function(preVal, curItem) {
  preVal.push(curItem);
  return preVal;
}, []);

【问题讨论】:

标签: javascript rxjs


【解决方案1】:

存在forkJoin 运算符,它将run all observable sequences in parallel and collect their last elements.(引用自文档)。但是如果你使用那个,你将不得不等待所有 5 个 promise 都解决,或者 5 个中的一个出错。它与RSVP.alljQuery.when 非常接近。因此,一旦您拥有第二个,那将不允许您做某事。无论如何我都会提到它,以防它在其他情况下对您有用。

另一种可能性是使用concatMap,这将允许您按顺序接收已解决的承诺。但是,我不清楚它们是否会并行启动,第二个 promise 应该只有在第一个 promise 解决后才开始。

我能想到的最后一个选项是使用 merge(2),它应该并行运行两个 Promise,并且在任何时候它们都只会是两个 Promise 被“启动”。

现在,如果您不使用defer,而使用concatMap,我相信您应该启动了所有AJAX 请求,并且仍然是正确的顺序。所以你可以写:

.concatMap(function(page) {
  return $.get(page);
})

相关文档:

【讨论】:

  • 如果我使用defer,那么请求会一个接一个地发送。没有延迟,它们是并行发送的,正是我想要的。谢谢。
  • 知道 RXJS 5 的答案是什么吗?谢谢。
【解决方案2】:

concatMap 保持顺序,但按顺序处理元素。

mergeMap 不保持顺序,而是并行运行。

我在下面创建了操作符 mergeMapAsync 以使流程元素(例如页面下载)并行但按顺序发出。它甚至支持限制(例如,最多并行下载 6 个页面)。

Rx.Observable.prototype.mergeMapAsync = mergeMapAsync;
function mergeMapAsync(func, concurrent) {
    return new Rx.Observable(observer => {
        let outputIndex = 0;
        const inputLen = this.array ? this.array.length : this.source.array.length;
        const responses = new Array(inputLen);

        const merged = this.map((value, index) => ({ value, index })) // Add index to input value.
            .mergeMap(value => {
                return Rx.Observable.fromPromise(new Promise(resolve => {
                    console.log(`${now()}: Call func for ${value.value}`);  
                    // Return func retVal and index.
                    func(value.value).then(retVal => {
                        resolve({ value: retVal, index: value.index });
                    });
                }));
            }, concurrent);

        const mergeObserver = {
            next: (x) => {
                console.log(`${now()}: Promise returned for ${x.value}`);
                responses[x.index] = x.value;

                // Emit in order using outputIndex.
                for (let i = outputIndex, len = responses.length; i < len; i++) {
                    if (typeof responses[i] !== "undefined") {
                        observer.next(responses[i]);
                        outputIndex = i + 1;
                    } else {
                        break;
                    }
                }
            },
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return merged.subscribe(mergeObserver);
    });
};

// ----------------------------------------
const CONCURRENT = 3;
var start = Date.now();
var now = () => Date.now() - start;

const array = ["a", "b", "c", "d", "e"];
Rx.Observable.from(array)
    .mergeMapAsync(value => getData(value), CONCURRENT)
    .finally(() => console.log(`${now()}: End`))
    .subscribe(value => {
        console.log(`${now()}: ${value}`); // getData
    });

function getData(input) {
    const delayMin = 500; // ms
    const delayMax = 2000; // ms

    return new Promise(resolve => {
        setTimeout(() => resolve(`${input}+`), Math.floor(Math.random() * delayMax) + delayMin);
    });
}
<!DOCTYPE html>
<html>
<head>
  <meta charset="utf-8">
  <meta name="viewport" content="width=device-width">
  <title>mergeMapAsync</title>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.min.js"></script>
</head>
<body>

</body>
</html>

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-09-14
    • 1970-01-01
    • 2015-09-17
    • 2020-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多