【问题标题】:How to properly recombine grouped observables?如何正确重组分组的 observables?
【发布时间】:2017-05-06 06:10:09
【问题描述】:

我正在尝试创建一个分析股票价格的工具。

我有一个不同股票的价格数据流,我希望有一个 observable 在收到一组新的、不同的和完整的价格时发出事件。

我的计划:针对不同的股票将流分成不同的子流,并重新组合它们的最新值。

假设我有一个这样的事件流:

from rx import Observable

stock_events = [
    {'stock': 'A', 'price': 15},
    {'stock': 'A', 'price': 16},
    {'stock': 'B', 'price': 24},
    {'stock': 'C', 'price': 37},
    {'stock': 'A', 'price': 18},
    {'stock': 'D', 'price': 42},
    {'stock': 'B', 'price': 27},
    {'stock': 'B', 'price': 27},
    {'stock': 'C', 'price': 31},
    {'stock': 'D', 'price': 44}
]

price_source = Observable.from_list(stock_events)

这是我的第一个(天真的)方法:

a_source = price_source.filter(lambda x: x['stock'] == 'A').distinct_until_changed()
b_source = price_source.filter(lambda x: x['stock'] == 'B').distinct_until_changed()
c_source = price_source.filter(lambda x: x['stock'] == 'C').distinct_until_changed()
d_source = price_source.filter(lambda x: x['stock'] == 'D').distinct_until_changed()

(Observable
    .combine_latest(a_source, b_source, c_source, d_source, lambda *x: x)
    .subscribe(print))

这正确地给了我:

({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 24}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 37}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 42})
({'stock': 'A', 'price': 18}, {'stock': 'B', 'price': 27}, {'stock': 'C', 'price': 31}, {'stock': 'D', 'price': 44})

不过,我觉得这应该是group_by更好的处理,而不是几个过滤,所以这里重新写一下:

(price_source
 .group_by(lambda e: e['stock'])
 .map(lambda obs: obs.distinct_until_changed())
 .combine_latest(lambda *x: x)
 .subscribe(print))

但这一次,我明白了:

(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000105EA20>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776AB00>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000776A438>,)
(<rx.core.anonymousobservable.AnonymousObservable object at 0x000000000775E7F0>,)

我在这里错过了什么?如何“解包”嵌套的 observable?

【问题讨论】:

  • 我不确定 GroupBy 是否适合您的情况。由于您不想在所有股票都发出之前输出元素,这意味着您知道股票代码是什么。我认为Combine latest 非常适合您的用例。您可能希望通过 Publish 共享底层子组件。
  • 你有没有解决过这个问题。如果可以的话,你能发布一个答案

标签: python-3.x system.reactive reactive-programming reactivex rx-py


【解决方案1】:

如果您确实想使用 groupby,那么它在 C# 中将类似于下面的内容。但是,这不符合您对“完整”集的要求。根据 cmets,怀疑 CombineLatest 在这里会更好。

price_source.GroupBy(x => x.Stock)
            .Select(gp => gp.DistinctUntilChanged(x => x.Price))
            .SelectMany(x => x)
            .Subscribe(s => Console.WriteLine($"{s.Stock} : {s.Price}"));

【讨论】:

    猜你喜欢
    • 2014-12-20
    • 1970-01-01
    • 2017-02-09
    • 2012-06-18
    • 2022-01-11
    • 2020-07-07
    • 2019-03-17
    • 1970-01-01
    • 2013-12-11
    相关资源
    最近更新 更多