【问题标题】:Use of AtomicBoolean required in RetryWhenRetryWhen 中需要使用 AtomicBoolean
【发布时间】:2018-11-04 09:44:01
【问题描述】:

Observable.retryWhen 的Javadoc 示例中,AtomicInteger 用于counter,而不是更简单的常规Int。这真的有必要吗?在什么情况下errors 可以在不同的线程上发出?

我阅读文档和源代码表明 takeWhileflatMap 闭包始终保证在同一线程上运行。

http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#retryWhen-io.reactivex.functions.Function-

Observable.timer(1, TimeUnit.SECONDS)
     .doOnSubscribe(s -> System.out.println("subscribing"))
     .map(v -> { throw new RuntimeException(); })
     .retryWhen(errors -> {
         AtomicInteger counter = new AtomicInteger();
         return errors
                   .takeWhile(e -> counter.getAndIncrement() != 3)
                   .flatMap(e -> {
                       System.out.println("delay retry by " + counter.get() + " second(s)");
                       return Observable.timer(counter.get(), TimeUnit.SECONDS);
                   });
     })
     .blockingSubscribe(System.out::println, System.out::println);

【问题讨论】:

  • 如果是 int 代码将无法编译,因为捕获的局部变量必须是有效的 final。
  • 在 Kotlin 中这不是问题。只需声明为var counter = 0
  • 当然,但是您发布的是 Java,而不是 Kotlin。

标签: kotlin thread-safety rx-java rx-java2 reactivex


【解决方案1】:

这不是绝对必要的,但有些人在看到用于计数器的单元素 int 数组时会心脏病发作,因此AtomicInteger

 Observable.timer(1, TimeUnit.SECONDS)
 .doOnSubscribe(s -> System.out.println("subscribing"))
 .map(v -> { throw new RuntimeException(); })
 .retryWhen(errors -> {
     int[] counter = { 0 };
     return errors
               .takeWhile(e -> counter[0]++ != 3)
               .flatMap(e -> {
                   System.out.println("delay retry by " + counter[0] + " second(s)");
                   return Observable.timer(counter[0], TimeUnit.SECONDS);
               });
 })
 .blockingSubscribe(System.out::println, System.out::println);

什么情况下错误会在不同的线程上发出?

处理程序序列可以有自己的线程,因此,每当您共享对可变状态的外部访问时,您应该确保访问是线程安全的。同样,在示例中,没有必要,因为计数器使用期间的特定组合在单个线程上运行并保证不会重叠自身,因为任何新错误只能在当前序列发出重试信号后发生。

【讨论】:

  • 即使errors 是一个任意的可观察对象,Reactive 流规范是否不能保证这些回调被串行调用?就像您永远不必担心在 onNext 回调中同步代码,因为它总是被串行调用。
  • 计数器在错误处理序列之外,因此您必须担心并发访问。
  • 如果takeWhileflatMap 总是被串行调用(一个线程永远不能在takeWhile 和另一个线程在flatMaptakeWhileflatMap 中的多个线程同时)比我不明白为什么我们必须担心并发访问。我只是想确保我了解 Rx 中的线程安全。
  • Javadoc 的目的是通过一个可立即运行的示例帮助用户理解运算符。用户倾向于复制粘贴此类示例,根据需要对其进行修改,并且鉴于使用了 AtomicInteger,他们不太可能遇到计数器的任何问题。如果有足够的经验,可以推断出保证同步性等属性,并可以优化代码以利用它。
猜你喜欢
  • 2011-05-28
  • 2020-10-11
  • 2013-01-30
  • 2018-06-13
  • 2019-01-20
  • 2016-06-21
  • 1970-01-01
  • 2015-06-19
  • 2012-01-03
相关资源
最近更新 更多