【问题标题】:RxJS: groupBy emits no values when used with timerRxJS:groupBy 与计时器一起使用时不发出任何值
【发布时间】:2020-06-11 01:41:12
【问题描述】:

RxJS (^6.5.5) 的新手,我对 groupBy 运算符有疑问。下面是一个精简的例子。

我有一个函数retrieveFiles(),它返回一个字符串数组。

function async retrieveFiles(): Promise<string[]> {
  return ['file1.txt', 'file2.txt', 'file3.txt', 'directory1', 'directory2'];
}

实际上,这会从远程源获取一组数据。

假设我想按前 4 个字符对文件名进行分组。

使用RxJS我可以这样做。

of(retrieveFiles()).pipe(
  concatMap(v => v),
  mergeMap(value => value),
  groupBy(
    (name: string) => name.substr(0, 4),
    (name: string) => name,
  ),
  mergeMap(group$ => group$.pipe(toArray())),
)
.subscribe(console.log);

这将向订阅者发出两个值。

[ 'file1.txt', 'file2.txt', 'file3.txt' ]
[ 'directory1', 'directory2' ]

现在让我们引入timer 并稍微修改一下代码。我们现在基本上是在投票。

timer(0, 1000)
  .pipe(
    concatMap(() => this.retrieveFiles()),
    mergeMap(value => value),
    groupBy(
      (name: string) => name.substr(0, 4),
      (name: string) => name,
    ),
    mergeMap(group$ => group$.pipe(toArray())), 
  )
  .subscribe(console.log);

这不再发出任何值。这两者有什么区别?

【问题讨论】:

  • 如果of(retrieveFiles())Observable&lt;string[]&gt; 类型,则此代码中有错误concatMap(v =&gt; v), 无效,它应该是Observable&lt;Promise&lt;string[]&gt;&gt; 吗? mergeMap(value =&gt; value), 部分在这两种情况下都是无效的。
  • 为什么retrieveFiles 需要返回一个 Observable? of 不应该为我创建那个 Observable 吗?
  • of 创建可观察到的 yes,这就是为什么 of(retrieveFiles()) 的类型将是 Observable&lt;Promise&lt;string[]&gt;&gt;
  • 我弄错了,我忘记了 rxjs 让你在新版本中使用数组和 Promise 玩起来更快更松
  • 版本 6.5.5 在这个特定的例子中,我会在问题中提到。谢谢。

标签: typescript rxjs


【解决方案1】:

在第二种情况下,区别在于 timer 永远不会完成,而 of 会完成,而 toArray 仅在源完成时发出,但源没有完成,因为来自 groupBy 的分组可观察对象是期待未来的排放,因为 timer 是它的来源,所以它永远不会完成它的分组观察。

如果您希望为每个投票生成新组,则需要进行一次小的重组,以便将 retrieveFiles 设为其来源

timer(0, 1000)
  .pipe(
    // recommend use switchMap so a hanging request doesn't clog your stream
    switchMap(() => from(retrieveFiles()).pipe( // need from to make it pipeable
      mergeMap(value => value),
      groupBy(
        (name: string) => name.substr(0, 4),
        (name: string) => name,
      ),
      mergeMap(group$ => group$.pipe(toArray())), 
    )),
  )
  .subscribe(console.log);

这里有一个有效的讨论,关于这是否值得。你可以通过一个简单的map 操作符来实现这个目标,它里面有一个同步数组分组函数,而不是所有的来回流转换。如果您只是在探索图书馆,那很酷,但在实践中,我不建议您这样做。 groupBy 用例并不常见,但应保留用于处理要分组为单独的可观察流的实际值流

【讨论】:

  • 开始尝试使用 RxJS,因为它很容易设置轮询并在找到新文件时发出新值。决定看看我是否也可以将它们分组。但是,我可以使用Array.reduce 收集文件并处理 RxJS 之外的分组。我没有观察单独的可观察流的用例。
  • 是的,玩起来很有趣,但如果这是您的目标,map 运算符中的数组 reduce 似乎是解决问题的方法。强烈建议转移到switchMap,但如果您真的使用它进行轮询,因为concatMap 将等待挂起的请求并可以建立背压,而switchMap 将取消并继续前进。
  • 没有意识到toArray 仅在源代码完成时发出。在本地实施修复,就像一个魅力。感谢您指出我的错误和swithMap 的建议。
猜你喜欢
  • 1970-01-01
  • 2022-01-01
  • 1970-01-01
  • 2021-01-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多