【问题标题】:Recommented way to implement observable collections in Python?在 Python 中实现可观察集合的推荐方法?
【发布时间】:2021-05-02 12:29:00
【问题描述】:

我想在 Python 中有一些可观察的集合/序列,让我能够监听更改事件,例如添加新项目或更新项目:

list = ObservableList(['a','b','c'])
list.addChangeListener(lambda new_value: print(new_value))
list.append('a') # => should trigger the attached change listener

data_frame = ObservableDataFrame({'x': [1,2,3], 'y':[10,20,30]})
data_frame.addChangeListener(update_dependent_table_cells) # => allows to only update dependent cells instead of a whole table

A. 我发现以下项目提供了可观察集合的实现,并且看起来很有希望:

https://github.com/dimsf/Python-observable-collections

它做我想做的事:

from observablelist import ObservableList

def listHandler(event):
    if event.action == 'itemsUpdated':
        print event.action + ', old items: ' + str(event.oldItems) + ' new items: ' + str(event.newItems) + ' at index: ' + str(event.index)
    elif event.action == 'itemsAdded' or event.action == 'itemsRemoved':
        print(event.action + ', items: ' + str(event.items) + ' at index: ' + str(event.index))

myList = ObservableList()
myList.attach(listHandler)

#Do some mutation actions, just like normal lists.
myList.append(10)
myList.insert(3, 0)

不过,最后一次更改是在 6 年前,我想知道是否有更多最新的或内置 Python 替代方案

B.我还发现了 RxPy:https://github.com/ReactiveX/RxPY

import rx
list = ["Alpha", "Beta", "Gamma"]
source = rx.from_(list)
source.subscribe(
   lambda value: print(value),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
) 

是否有可能保持订阅打开,以便我能够在订阅之后将新值附加到列表中?虚拟代码:

source.subscribe(..., keep_open = True)
source.append("Delta")  # <= does not work; there is no append method
source.close()

换句话说:我可以/应该使用 RxPy 源作为可观察的集合吗?

C. Python 中似乎存在许多不同的可能性来处理事件和实现观察者模式:

Event system in Python

Python Observer Pattern: Examples, Tips?

alternate ways to implement observer pattern in python

Using decorators to implement Observer Pattern in Python3

=> 在 Python 中实现可观察集合的推荐/pythonic 方法是什么?我应该使用(过时的?)A. 还是 B. 的改编形式(似乎有不同的目的?)甚至 C. 的另一种策略?

=> 是否有计划以某种方式标准化这些可能性并直接在 Python 中包含默认的 observable 集合?

相关问题,特定于 DataFrames:

How to make tables/spreadsheets (e.g. pandas DataFrame) observable, use triggers or change events?

【问题讨论】:

    标签: python collections observable sequence observer-pattern


    【解决方案1】:

    我从未使用过 RxPy,但它似乎是一种非常接近 js/ts 的 rx 模式实现。

    首先,您需要一个可观察对象,您可以使用它来将数据推送到它和观察者中。那是subject,可能是行为主题或重播主题。创建主题,然后使用 on_next() 运算符将新值推入其中。

    对于第二个问题,您似乎想将多个可观察对象“组合”成一个可观察对象。有多种方法可以做到这一点,但最有可能的是,您正在寻找的是 CombineLatest 或 Concat。掠夺operators

    如果我以您的第二个示例为例,代码将如下所示:

    from rx.subject.subject import Subject
    
    list = ["Alpha", "Beta", "Gamma"]
    # assuming that you want each item to be emitted one after the other
    subject = Subject()
    subject.subscribe(
        lambda value: print(value),
        on_error = lambda e: print("Error : {0}".format(e)),
        on_completed = lambda: print("Job Done!")
    )
    subject.on_next('Alpha')
    subject.on_next('Beta')
    subject.on_next('Gamma')
    subject.on_next('Delta')
    

    如果您使用 BehaviourSubject,您将能够提供一个初始值,并且当一个新的观察者订阅时,它将接收最后一个发出的值。 如果您使用 ReplaySubject,您可以提供值,然后订阅,观察者将接收到该主题发出的所有值。

    【讨论】:

      【解决方案2】:

      刚刚找到一个基于 RxPy 的实现。 最后一次更改是从 2018 年开始,它似乎还没有为 RxPY 3.x 做好准备。

      https://github.com/shyam-s00/ObservableCollections

      https://github.com/shyam-s00/ObservableCollections/issues/1

      from reactive.ObservableList import ObservableList
      
      ol = ObservableList([1, 2, 3, 4])
      ol.when_collection_changes() \
          .map(lambda x: x.Items) \
          .subscribe(print, print)
      
      ol.append(5)
      

      它提供

      • 可观察列表
      • ObservableDict
      • 可观察集

      另见https://github.com/ReactiveX/RxPY/issues/553

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-11-13
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多