【问题标题】:Chain Observables链 Observables
【发布时间】:2017-01-01 14:20:34
【问题描述】:

我有一个对象集合,称它们为obj。他们有一个act() 方法。 act() 方法最终将导致 o 上的 event() observable 调用 onComplete

什么是链接这些的好方法?

即调用o.act(),等待o.event().onComplete,然后调用下一个o2.act(),以此类推,无限数量的o在集合中。

所以签名是这样的:

public class Item {
    final protected PublishSubject<Object> event = PublishSubject.create();

    public Observable<ReturnType> event() {
        return event;
    }

    public void act() {
        // do a bunch of stuff
        event.onComplete();
    }
}

然后在消费代码中:

Collection<Item> items...
foreach item in items
  item.act -> await item.event().onComplete() -> call next item.act() -> so on

【问题讨论】:

  • 是否需要在o2.act()中返回o.act()?如果没有,你能异步运行所有这些吗?
  • @Sebas o2, o3 等每个都可能取决于先前执行的副作用(但不是实际的返回值)
  • 那么,顺序很重要,对吗?
  • @Sebas - 订单很重要,是的
  • 然后检查一下,它可能会帮助stackoverflow.com/a/26936234/1291428 - 似乎switchmap 会阻止您的可观察对象在之前的结果完成之前发出它们的结果

标签: java rx-java reactivex


【解决方案1】:

如果我理解正确,您的对象具有这种签名:

public class Item {
    public Observable<ReturnType> event()...
    public ReturnType act()...
}

所以如果他们是这样填写的:

public class Item {

    private final String data;
    private final Observable<ReturnType> event;

    public Item(String data) {
        this.data = data;

        event = Observable
                .fromCallable(this::act);
    }

    public Observable<ReturnType> event() {
        return event;
    }

    public ReturnType act() {
        System.out.println("Item.act: " + data);
        return new ReturnType();
    }
}

然后他们可以像这样被链接起来:

Item item1 = new Item("a");
Item item2 = new Item("b");
Item item3 = new Item("c");

item1.event()
        .concatWith(item2.event())
        .concatWith(item3.event())
        .subscribe();

结果:

Item.act: a
Item.act: b
Item.act: c

如果你有一个Iterable 集合,你可以使用flatMap

Iterable<Item> items = Arrays.asList(item1, item2, item3);

Observable.from(items)
        .flatMap(Item::event)
        .subscribe();

另类

与您的情况更相似的替代方案可能是:

public class Item {
    private final PublishSubject<Void> event = PublishSubject.create();

    private final String data;

    public Item(String data) {
        this.data = data;
    }

    public Observable<Void> event() {
        return event;
    }

    public Void act() {
        System.out.println("Item.act: " + data);
        // do a bunch of stuff
        event.onCompleted();
        return null;
    }
}

用法:

Iterable<Item> iterable = Arrays.asList(item2, item3);

item1.event().
        concatWith(Observable.from(iterable)
                .map(Item::act))
        .subscribe();

item1.act();

但它不会在第 2 项以后使用 event()

【讨论】:

  • 这真的很接近,抱歉细节不足。我使用您的 Item 类更新了问题。谢谢
  • 所以你想通过调用第一个act来开始这一切?
  • 没错,没错!
  • 我认为如果没有观察者,工作不做是一种更好的模式。这就是为什么工作通常在订阅时开始。以一种方式为第一项启动它,而另一种方式为后续项目启动它似乎很臭。你有什么理由需要这种方式?
  • item.act() 中的活动是固有“热”的系统进程。前面的进程可能会做一些事情,比如创建后面的依赖的目录。
猜你喜欢
  • 2019-08-13
  • 1970-01-01
  • 2018-01-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多