【问题标题】:Construct Observer/Observable pattern using RxCpp使用 RxCpp 构造 Observer/Observable 模式
【发布时间】:2017-11-13 18:54:21
【问题描述】:

我正在尝试在Rx-cpp 中实现observer/observable 模式。这是一个非常有趣的tutorial in Rx.Net 关于某人如何做到这一点。

在这个C# 示例中,我们必须覆盖特定的interfaces

public interface IObserver<in T>
{
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
}


public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

据我了解,Rx-cpp 没有这样的便利。那么,是否可以为我提供一些标题示例(myObservable.h/myObserver.h),类似于上面的interfaces,我可以将其用作定义相同通信模式的指导?

非常感谢任何帮助, 谢谢!

编辑 1: 感谢@zentrunix,我正在尝试进行面向课堂的交流。到目前为止,我有以下可观察模式的代码。我想要的是定义一个观察者列表,将我附加到可观察对象中,当调用OnNext 时,应该通知这些观察者。但是,有一些缺失的部分。

  1. 当调用 myObservable::Subscribe() 函数时,我如何在这些观察者 (Rx::subscribers&lt;int&gt;) 上 subscribe()
  2. 还有我怎么unsubscribe()
  3. 最后,如何在多个onNext 观察者中对应o.subscribe(onNext, onEnd);?是否可以构造一个对应的myObserver 类? (再次受到here 的启发)
  4. 对不起,这样的组织有意义吗?到目前为止,我一直在使用tutorial 中提供的架构,这就是我痴迷于这项任务的原因。我发现这是参与RxCpp 的一种方式。任何 cmet 都非常感谢。(再次为我的无知道歉。)

    class myObservable {
    
    private:
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
    
    public:
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
    
        return Rx::observable<>::create<int>([&, out]() {
            auto it = observers->insert(observers->end(), *out);
            it->add([=]() {
                observers->erase(it);
            });
        });
    
    };
    
    void OnNext(int sendItem) {
    
        for (Rx::subscriber<int> observer : *observers) {
            (observer).on_next(sendItem);
        }
    }
    
    void Disposer(Rx::subscriber<int> out) {
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
    };
    };
    

【问题讨论】:

  • 我要做的是构造两个继承RxCppobserverobservable函数的类。在Rx.Net 示例中,这是由class myIObserver : IObserver&lt;in Τ&gt; 完成的。这不是如何将类转换为virtual,而是如何构造与SubjectObserver : IObserver&lt;Subject&gt;SubjectObservable : IObservable&lt;Subject&gt; 执行相同功能的myObserver.hmyObservable.h。感谢他的兴趣。
  • 我附上c#interfaces 的原因是因为我认为应该实现相同的功能(OnComplete()OnNext(T value) 等),但我没有找到其virtual
  • 我认为您的问题需要重新表述。另外,我刚刚查看了一些 RxCpp 源(例如github.com/Reactive-Extensions/RxCpp/blob/master/Rx/v2/src/…),并没有看到任何看起来像接口的东西。我认为您可能以完全错误的方式处理此问题。
  • 您的MyObserver 可能应该实现一些方法,如OnNext,并有一个成员rxcpp::observer&lt;some template arguments&gt; m_rxObserver,您可以通过将适当的包装器传递给这些方法来构造该成员。肯定有一些 rxcpp 代码示例可供您使用?
  • 很好,我试过class myIObserver : public Rx::observer&lt;Type T&gt;,但没有virtual 函数。我也试过class myIObserver : public Rx::virtual_observer&lt;Type T&gt;,但我不知道这没关系。此外,我在没有任何虚函数的observable 模式中存在严重问题。再次感谢。

标签: c# c++ system.reactive observer-pattern rxcpp


【解决方案1】:

下面是一个非常简单的 RxCpp 示例。 不过(至少)有一个警告:典型的 RxCpp 代码大量使用 lambda,我非常不喜欢。

我也尝试在 Internet 上查找文档和教程,但找不到。我对线程模型的解释特别感兴趣。

如果您愿意浏览代码和 Doxygen 文档,RxCpp GitHub 站点中有很多示例。

#include <iostream>
#include <exception>

#include "rxcpp/rx.hpp"
namespace rx = rxcpp;

static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }

static void onError(std::exception_ptr ep)
{
  try { std::rethrow_exception(ep); }
  catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}

static void observableImpl(rx::subscriber<int> s)
{
  s.on_next(1);
  s.on_next(2);
  s.on_completed();
}

int main()
{
  auto o = rxcpp::observable<>::create<int>(observableImpl);
  std::cout << "*\n";
  o.subscribe(onNext, onEnd);
} 

【讨论】:

    猜你喜欢
    • 2011-09-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-08
    • 2017-11-13
    • 1970-01-01
    • 2016-07-06
    • 1970-01-01
    相关资源
    最近更新 更多