【问题标题】:ReactiveX: Make Observable.Create() be called only onceReactiveX:使 Observable.Create() 只被调用一次
【发布时间】:2017-03-30 04:57:31
【问题描述】:

我正在尝试使用 ReactiveX(更准确地说是 Rx.Net)和 SQLite.Net 构建数据访问层。

部分工作是创建一个返回数据库连接的可观察对象,以便仅在需要时才可以延迟打开它。到目前为止,这是我想出的:

var connection = Observable.Create<SQLiteConnection>(observer =>
{
    Debug.WriteLine("CheckInStore: Opening database connection");
    var database = new SQLiteConnection(configuration.ConnectionString.DatabasePath);

    observer.OnNext(database);
    observer.OnCompleted();

    return Disposable.Create(() =>
    {
        Debug.WriteLine("CheckInStore: Closing database connection");
        database.Close();
    });
});


// Further down the line, a query would look like this:
var objects = connection.SelectMany(db => db.Query<>("select * from MyTable"));

不幸的是,每次有人订阅这个 observable 时,都会创建一个新的连接。一旦订阅被处理,它也会关闭。

我尝试使用.Replay(1).RefCount(),但它并没有改变任何东西。无论如何,我不确定是否理解整个 RefCount 的事情。

如何使这个数据库连接成为单例?

【问题讨论】:

    标签: c# database system.reactive


    【解决方案1】:

    看看这段代码,它是等效的,但没有打开数据库连接:

    var conn = Observable.Create<int>(o =>
        {
            Debug.WriteLine("Opening");
            o.OnNext(1);
            o.OnCompleted(); //This forces closing code to be called. Comment me out.
            return Disposable.Create(() =>
            {
                Debug.WriteLine("Closing");
            });
        })
        //.Replay(1)
        //.RefCount() //.Replay(1).RefCount is necessary if you want to cache the result
        ;
    
    var sub1 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"1: {i}"));
    var sub2 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"2: {i}"));
    sub1.Dispose();
    sub2.Dispose();
    var sub3 = conn.SelectMany(i => Observable.Return(i)).Subscribe(i => Debug.WriteLine($"3: {i}"));
    sub3.Dispose();
    

    这里有很多问题:

    1. 每次您取消订阅或完成 observable 时,都会调用您的处置/取消订阅代码。由于您拨打的是OnCompleted,因此每次都会打开/关闭。
    2. 如果你想重复使用同一个连接,你需要使用.Replay(1).RefCount()。每次订阅者连接时,Observable.Create 都会运行整个函数,没有任何东西(.Replay(1).Refcount() 除外)为您缓存它。
    3. 即使您添加了.Replay(1).Refcount() 并删除了OnCompleted,如果没有未完成的订阅(例如在sub2.Dispose() 调用之后),您仍然会获得处置(即DB-Closed)行为。
    4. 如果您不通过using(var sub = connection.SelectMany(...)) 或明确通过sub.Dispose() 处理订阅,您将永远不会取消订阅,因为此 Observable 无法终止。换句话说,与 3 的相反问题,你的Close 代码永远不会发生。

    我希望你明白:这是一种非常容易出错的做事方式。我会推荐一个简单的迭代调用,因为无论如何这往往对数据库调用更有效。如果您坚持使用 RX,我会查看 Observable.Using 进行数据库连接初始化。

    【讨论】:

      猜你喜欢
      • 2021-01-04
      • 2019-08-23
      • 2017-09-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多