【问题标题】:Combining parts of Stream组合 Stream 的各个部分
【发布时间】:2015-01-22 09:50:47
【问题描述】:

我也有一个观察者正在观察一个不断被写入的日志。每一行都是一个新的 onNext 调用。有时,日志会在多行中输出单个日志项。检测到这一点很容易,我只是找不到正确的 RX 调用。

我想找到一种方法将单个日志项收集到一个行列表中,并在单个日志项完成时将列表中的 onNext 收集起来。

Buffer 似乎不对,因为这不是基于时间的,而是基于算法的。

GroupBy 可能是我想要的,但文档对此感到困惑。似乎它创建的可观察对象在源可观察对象完成之前可能不会调用 onComplete。

此解决方案不能过多地延迟日志(最好一点也不)。我需要尽可能接近实时地阅读日志,并且订单很重要。

任何朝着正确方向的推动都会很棒。

【问题讨论】:

标签: system.reactive rx-java


【解决方案1】:

这是一个典型的反应式解析问题。您可以使用Rxx Parsers,或者对于本机解决方案,您可以使用Scan 或定义async iterator 来构建自己的状态机。 Scan 更适合简单的解析器,并且经常使用Scan-Where-Select 模式。

异步迭代器状态机示例:Turnstile

Scan 解析器示例(未经测试):

IObservable<string> lines = ReadLines();
IObservable<IReadOnlyList<string>> parsed = lines.Scan(
  new
  {
    ParsingItem = (IEnumerable<string>)null,
    Item = (IEnumerable<string>)null
  },
  (state, line) =>
    // I'm assuming here that items never span lines partially.
    IsItem(line)
    ? IsItemLastLine(line)
      ? new
        {
          ParsingItem = (IEnumerable<string>)null,
          Item = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line)
        }
      : new
        {
          ParsingItem = (state.ParsingItem ?? Enumerable.Empty<string>()).Concat(line),
          Item = (List<string>)null
        }
    : new
      {
        ParsingItem = (IEnumerable<string>)null,
        Item = new[] { line }
      })
  .Where(result => result.Item != null)
  .Select(result => result.Item.ToList().AsReadOnly());

【讨论】:

  • 感谢您的回复。 Rxx 是不行的,因为我实际上使用的是 Rx-Java,它是 netflix 所做的库的 Java 端口。扫描看起来不错。我会验证它是否符合我的要求,然后将您的答案标记为完整,或者如果它不起作用,请在此处说些什么。
猜你喜欢
  • 2020-07-07
  • 2021-05-22
  • 1970-01-01
  • 2011-12-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多