【问题标题】:How to handle exception and repeat with Rx如何处理异常并使用 Rx 重复
【发布时间】:2020-06-21 16:33:49
【问题描述】:

我正在使用Observable.FromAsync 执行IO 操作。我想永远重复此操作。我不明白如何处理异常,对它们做一些事情,然后返回我的循环:

我尝试过的:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);

现在我不明白为什么我的 observable 在第一个异常之后结束。我没有告诉它继续吗。

我想做什么:

  • 执行 IO 操作
  • 如果抛出返回一些自定义值
  • 重复循环

如果我的 observable 是可枚举的,我会想要这种行为:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}

即使OnError 被触发,我如何继续执行Repeat

Observable.CatchObservable.Throw应该如何与Observable.Repeat结合?

【问题讨论】:

  • 你能处理[something]里面的异常吗?
  • 我可以处理它,但出于好奇,它可以通过仅使用Rx 构造在外部处理吗?可以在 Error 之后构造一个新的 Observable 吗?如果是这样,Repeat 还有意义吗?我可以说myobservable.Catch(myobservable) 之类的话吗?

标签: c# observable system.reactive


【解决方案1】:

当你有一个 IObservable&lt;string&gt; ioObs = Observable.FromAsync&lt;string&gt;(Something); 时,你有一个 observable 可以返回一个值然后完成 ({OnNext}{OnCompleted}) 或者你有一个会引发异常的 observable ({OnError})。

让源在返回值或错误后重复执行非常简单。

IObservable<string> query = ioObs.Retry().Repeat();

.Retry() 说“如果出现错误,请再次执行”。 .Repeat() 表示“如果 observable 完成,请再次订阅”。

现在这有点危险,因为您已经生成了一个可以连续执行的 observable。你需要想办法阻止它。

您的选择是:

  • 处置订阅
  • 取一定数量的值(即.Take(n)
  • 设置.Timeout
  • 或使用TakeUntil

当您原来的 ioObs 在完成时返回 null 或空字符串时,最后一个很好。

你可以这样做:

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x == null);
        

这里有一段测试代码,你可以试试:

private int __counter = 0;
Task<string> Something()
{
    return Task.Run(() =>
    {
        if (Interlocked.Increment(ref __counter) % 7 == 0)
        {
            throw new Exception("Blam!");
        }
        return $"Hello World {__counter}";
    });
}

然后这样做:

IObservable<string> ioObs = Observable.FromAsync<string>(Something);

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x.EndsWith("19"));
        

当我订阅时,我得到:

你好世界 1 你好世界2 你好世界3 你好世界4 你好世界5 你好世界 6 你好世界 8 你好世界 9 你好世界 10 你好世界 11 你好世界 12 你好世界 13 你好世界 15 你好世界 16 你好世界 17 你好世界 18 你好世界 19

请注意,714 丢失了,因为那是引发异常的时间。

【讨论】:

    【解决方案2】:

    一旦抛出异常,observable 就死了。这与正常的方法执行相同。当抛出异常时,它会在 try-catch 块所在的位置进行处理。不可能“返回”最初生成异常的地方并从那里继续。与 observables 相同。

    Observable.Catch() 扩展方法可用于捕获此类异常(从现在已死的 observable 中)并继续处理新的 observable。您正在使用的重载Observable.Catch(IObservable&lt;TSource&gt;) 使用params 参数供IObservable 对象使用。当第一个因异常而失败时,跳转到第二个。如果那个失败,跳到第三个,依此类推。但是,您只提供了一个带有 Observable.Catch(ioObs) 的 observable,因此当第一个 observable 失败时,没有什么可跳转的。

    当您使用 other overload 时可能会出现这种情况,它需要使用可枚举的订阅。当一个失败时,将使用下一个。请参见以下示例:

    public static int requestCounter = 0;
    public static Task<string> SomeSourceMethod() {
        Random r = new Random();
        Thread.Sleep(50);
        if (r.NextDouble() < 0.5) {
            Console.Write("E ");
            throw new Exception();
        }
        return Task.FromResult($"test {requestCounter++}");
    }
        
    public static IEnumerable<IObservable<string>> ObservableEnumerable() {
        for (;;) {
            IObservable<string> foo = Observable.FromAsync<string>(SomeSourceMethod);
            IObservable<string> fooRepeated = foo.Repeat();
            yield return fooRepeated;
        }
    }
    
    static void Main(string[] args)
    {
        IObservable<string> endless = ObservableEnumerable().Catch();
        IDisposable subscription = endless
            .SubscribeOn(NewThreadScheduler.Default)
            .Subscribe(it => Console.WriteLine(it));
    
        Thread.Sleep(1000);
        Console.WriteLine("Terminating the subscription");
        subscription.Dispose();
    }
    

    这可能会产生如下输出:

    E test 1
    E E E E test 2
    test 3
    test 4
    test 5
    test 6
    E E test 7
    test 8
    E test 9
    Terminating the subscription
    E E E E E E E E E E E E E E E E E E E 
    

    但是它不能正常工作,因为我不得不硬终止进程才能停止。它有点(?)以某种方式工作,但我不确定这样做是否是正确的方法(可能不是)。

    【讨论】:

    • 所以你是说基本上当我的 observable 死了时,我需要生成一个新的,然后将订阅者附加到它。我不明白的是你只是使用 Catch 而不提供可观察到的延续(即使您的Observable 是无穷无尽的)。所以你yield-ing observables 但如果失败了Catch 块如何帮助你?另外,既然您获得了IEnumerable,那么您如何使用SubscribeOn
    • @BercoviciAdrian ObservableEnumerable() 正在返回可枚举的 IObservable&lt;string&gt; 实例。 Catch() 扩展方法将使用此枚举并开始使用该枚举中的第一个可观察对象。当它失败时,它使用可枚举中的下一个。因此,subscription 变量将有一个活动订阅,它打印生成的字符串,但实际上底层源失败了 50% 并出现异常,这是由 Catch() 调用处理的(跳转到下一个 observable)。
    • 所以基本上你创建了一个可观察对象工厂,并在第一个死掉时使用Catch 从一个跳到另一个。
    • 使用Task.FromResult(...) 在返回Task&lt;T&gt; 的方法中抛出错误是很危险的。整个方法应该包含在 Task.Run(() =&gt; ...) 中,并从那里抛出异常。它极大地改变了 observable 的运行方式。如果你改变你的方法然后你会发现你不会在Terminating the subscription之后得到E E E E E E E E E E E E E E E E E E E 。然后 observable 将正确终止。
    • 使用for (;;) { ... yield } 也很麻烦。这是个坏主意。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-12-13
    • 2011-01-07
    • 1970-01-01
    • 1970-01-01
    • 2014-01-06
    • 1970-01-01
    相关资源
    最近更新 更多