【问题标题】:Observable.from(something that needs to be done on another thread)Observable.from(需要在另一个线程上完成的事情)
【发布时间】:2015-05-02 16:47:54
【问题描述】:

我正在学习 Rx-java,遇到了一个小问题。 我正在尝试获取 Object 列表并将其传递给 Observable.from() 以便我可以处理它。

问题:这个列表需要在另一个线程上获取(内部有 http 调用),所以 Observable.from(getList()) 什么也没给我。

我尝试过类似的方法:

Observable.create(new Observable.OnSubscribe>() { @覆盖 公共无效调用(订阅者>订阅者){ 订阅者.onNext(getList()); 订阅者.onCompleted(); } }).订阅...

但是这个订阅是在一个 Iterable 上的,onNext 只传递完整的列表,而不是列表中的每个 OIbject。 我错过了什么?我怎么做 ? 谢谢

更新

这是我正在尝试做的事情(以及我如何使用 toSortedList):

Observable.from(getList())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .toSortedList(new Func2<Object, Object, Integer>() {
      @Override
      public Integer call(Object left, Object right) {
        if (left.equals(right))
          return 0;
        return left.getLabel(pm).compareToIgnoreCase(right.getLabel(pm));
      }
    })
    .subscribe(new Observer<List<Object>>() {
//On next sends a List of objects I'm using
}

问题是 getList() 需要在另一个线程上调用(它返回一个 Iterable,而不是 Observable)。 也许我没有正确使用您的第一个解决方案 dwursteisen。

更新 2

这是我不明白为什么它不起作用的东西:

Observable.just(0)
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap(new Func1<Integer, Observable<Object>>() {
      @Override
      public Observable<Object> call(Integer integer) {
        return Observable.from(getList());
      }
    })
    .toSortedList(new Func2<Object, Object, Integer>() {
      @Override
      public Integer call(Object left, Object right) {
        if (left.equals(right))
          return 0;
        return left.getLabel(pm).compareToIgnoreCase(right.getLabel(pm));
      }
    })
    .subscribe( ... )

或者通过订阅替换 flatMap 调用并在 onNext 中执行所有操作,它仍然在 mainThread 上调用 getList() ...

【问题讨论】:

    标签: java rx-java


    【解决方案1】:

    如果你想使用特定的线程,你可以使用subscribeOn方法来指定使用哪个“tread”(调度器)。

    sourceObservable.subscribeOn(httpScheduler)
                    .flatMap(Observable::from)
                    .subscribe();
    

    如果它不起作用,您仍然可以使用您的解决方案。但只需迭代您的列表:

    Observable.create(new Observable.OnSubscribe<CategorizedActivityInfoWrapper>() {
      @Override
      public void call(Subscriber<? super CategorizedActivityInfoWrapper> subscriber) {
         for(o : getList()) {
             subscriber.onNext(o);
         }
    
         subscriber.onCompleted();
      }
    }).subscribe ...
    

    更新

    Observable.from(getList()).subscribe();
    

    也可以这样写:

        List objs = getList();
        Observable.from(objs).subscribe();
    

    所以,getList() 将在当前线程中被调用。您将无法控制将在哪个线程中调用 getList()

    【讨论】:

    • 实际上这些解决方案都没有给我想要的东西。在 from() 之后,我调用了 toSortedList(),这不适用于我自己创建的订阅者(就像你提出的第二个解决方案一样)。实际上,我知道的唯一方法是调用 Observable.just(0),调用我的 HTTPcall onNext 并发送一个 Observable onComplete。威奇很丑。也许它可以通过将 Future 传递给 from() 来工作,但我现在不知道该怎么做。
    • 1) 你如何使用 toSortedList() ?它甚至可以与自定义的 observable 一起使用(但对象可能是可比较的)。即:sourceObservable.flatMap(Observable::from).toSortedList().subscribe();什么不工作? 2) Observable.just(0) 将调用 onNext 和 onCompleted。您尝试实现的目标与您的客户 Observable 相同。 3) Observable.from(aFuture) 会调用 Future.get 方法然后阻塞。
    【解决方案2】:

    您需要将异步计算放在observeOn 之前(也可以选择在subscribeOn 之后):

    public class AsyncStartOtherThread {
        public static void main(String[] args) throws Exception {
            CountDownLatch cdl = new CountDownLatch(1);
            Observable.just(1).subscribeOn(Schedulers.newThread())
            .map(e -> {
                System.out.println(Thread.currentThread());
                return Arrays.asList(1, 2, 3, 4, 5);
            })
            .observeOn(Schedulers.computation())
            .doOnNext(e -> System.out.println(Thread.currentThread()))
            .flatMap(Observable::from)
            .toSortedList((a, b) -> Integer.compare(b, a))
            .subscribe(System.out::println, Throwable::printStackTrace, () -> { System.out.println("Done"); cdl.countDown(); });
            ;
            cdl.await();
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-09-29
      • 2020-12-15
      • 1970-01-01
      • 2021-06-21
      • 1970-01-01
      • 2011-03-31
      相关资源
      最近更新 更多