【问题标题】:How to split a stream into multiple streams in sequence如何将一个流按顺序拆分成多个流
【发布时间】:2019-04-15 10:07:32
【问题描述】:

使用 RxJS 6,

我有一个任意数据流:

[in] -> a, b, c, d, e, f, g, h, i, ....

我想以交替顺序将其拆分为固定数量的 N 个流(在本例中为 3 个输出流):

[out] -> stream1 -> a, d, g
      -> stream2 -> b, e, h
      -> stream3 -> c, f, i

或更简单地说:

a => stream1
b => stream2
c => stream3
d => stream1
e => stream2
f => stream3
g => stream1
h => stream2
i => stream3

有人知道我该怎么做吗?

【问题讨论】:

  • 所以你想要一个 Observable 发射其他 Observables(比如 groupBy 操作符)或者只是三个不同的 Observables?
  • emmiting 到其他三个固定的 observables,实际上是一个数组,应该提前创建,主要是在第一次确定它们的数量 (N) 时

标签: javascript rxjs rxjs6


【解决方案1】:

您可以迭代 N 并在每次迭代时使用 partition 将您的流一分为二:

import { from, merge } from 'rxjs';
import { partition, map } from 'rxjs/operators';

const source = from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i']);

function split(source, n) {
  const streams = [];
  let toSplit = source;
  for (let k = n; k > 0; --k) {
    const [stream, rest] = toSplit.pipe(
      partition((_, i) => i % k === 0)
    );
    streams.push(stream);
    toSplit = rest;
  }
  return streams;
}

const obs = split(source, 3);

const subscribe = merge(
  obs[0].pipe(map(val => `1: ${val}`)),
  obs[1].pipe(map(val => `2: ${val}`)),
  obs[2].pipe(map(val => `3: ${val}`)),
).subscribe(val => console.log(val));

See this StackBlitz example.

【讨论】:

  • 谢谢,我有一种感觉,它会是这样的,只是不知道你可以在分区回调中获取索引。我认为这是实现这一目标的唯一合理方法?
  • @user2765977,不客气。这是我认为最直接的方法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2013-06-11
  • 2020-01-08
  • 1970-01-01
  • 2016-06-29
  • 1970-01-01
  • 2019-01-12
  • 1970-01-01
相关资源
最近更新 更多