【问题标题】:RxJava Observable timeout before first element第一个元素之前的 RxJava Observable 超时
【发布时间】:2017-12-04 16:30:52
【问题描述】:

我有向我发送 ping 的设备,为此我使用 observable。但在第一次 ping 之前,我们开始连接,这需要一些时间。因此我希望第一次 ping 有 10 秒超时。我是这样设计的:

public Observable<Ping> getPing() {
    ConnectableObservable<Ping> observable = device.connectToDevice().publish();

    Observable<Ping> firstWithTimeout = observable.take(1).timeout(10, TimeUnit.SECONDS);
    Observable<Ping> fromSecondWithoutTimeout = observable.skip(1);

    Observable<Ping> mergedObservable = firstWithTimeout.mergeWith(fromSecondWithoutTimeout)
            .doOnDispose(() -> disconnect(bluetoothDevice))
            .doOnError(error -> disconnect(bluetoothDevice));

    observable.connect();
    return mergedObservable;
}

为了测试我使用

Subject<Ping> observable = PublishSubject.create();
when(device.connect()).thenReturn(observable);
TestObserver<Ping> testSubscriber = TestObserver.create();
getPing.subscribe(testSubscriber);
observable.onNext(new Ping());

testSubscriber.assertValueCount(1);

尽管我立即发送 ping,但由于 TimeoutException,此测试将失败。

【问题讨论】:

  • 你解决了吗?我也有同样的问题。

标签: java android rx-java observable rx-java2


【解决方案1】:

有一个重载的timeout 运算符非常适合这里:

timeout(ObservableSource&lt;U&gt; firstTimeoutIndicator, Function&lt;? super T, ? extends ObservableSource&lt;V&gt;&gt; itemTimeoutIndicator)

假设您的可观察参考是testObserable,您只需执行以下操作:

testObservable.timeout(
        Observable.timer(5L, TimeUnit.SECONDS), // here you set first item timeout
        ignored -> Observable.never() // for other elements there is no time function
)

【讨论】:

    【解决方案2】:

    请看一下这个设置:

    JUnit5 / RxJava2

    我认为你错误的模拟配置错误

    when(device.connect()).thenReturn(observable);

    请看看我的实现。当您为每个方法调用创建新的可观察对象时,无需使用发布/连接。在设备中使用 autoConnect 方法实现 connectToDevice()

      Device device;
    
      @BeforeEach
      void setUp() {
        device = mock(Device.class);
      }
    
      @Test
      void name() throws Exception {
        Subject<Ping> observable = PublishSubject.create();
    
        when(device.connectToDevice()).thenReturn(observable);
    
        TestObserver<Ping> test = getPing(Schedulers.computation()).test();
        observable.onNext(new Ping());
    
        test.assertValueCount(1);
      }
    
      @Test
      void name2() throws Exception {
        Subject<Ping> observable = PublishSubject.create();
    
        when(device.connectToDevice()).thenReturn(observable);
    
        TestScheduler testScheduler = new TestScheduler();
        TestObserver<Ping> test = getPing(testScheduler).test();
    
        testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
    
        observable.onNext(new Ping());
    
        test.assertError(TimeoutException.class);
      }
    
      private Observable<Ping> getPing(Scheduler scheduler) {
    
        return device
            .connectToDevice()
            .take(1)
            .timeout(10, TimeUnit.SECONDS, scheduler)
            .doOnDispose(() -> disconnect())
            .doOnError(error -> disconnect());
      }
    
      private void disconnect() {}
    
      interface Device {
        Observable<Ping> connectToDevice();
      }
    
      class Ping {}
    

    【讨论】:

    • observable.take(1) 将调用 onComplete() 并在第一次收到值后自动处理
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多