【问题标题】:Calling network services in parallel using RxJava. Is this the right way?使用 RxJava 并行调用网络服务。这是正确的方法吗?
【发布时间】:2015-01-18 00:25:58
【问题描述】:

想法是并行进行 3 个网络调用。 (我使用谷歌作为演示目的的服务。以下工作但不确定这是正确的方法还是可以简化。如果我必须结合所有三个搜索的响应我该怎么办?请指教。

public class GoogleSearchRx
{
    public static void main(String args[])
    {
        CountDownLatch latch = new CountDownLatch(3);

        search("RxJava").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        search("Reactive Extensions").subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        //run the last one on current thread
        search("Erik Meijer").subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );

        try
        {
            latch.await();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }

    public static Observable<Elements> search(String q)
    {
        String google = "http://www.google.com/search?q=";

        String charset = "UTF-8";
        String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // Change this to your company's name and bot homepage!

        return Observable.create(new Observable.OnSubscribe<Elements>()
        {

            @Override public void call(Subscriber<? super Elements> subscriber)
            {
                out.println(currentThreadName() + "\tOnSubscribe.call");

                try
                {
                    Elements links = Jsoup.connect(google + URLEncoder.encode(q, charset)).timeout(1000).userAgent(userAgent).get().select("li.g>h3>a");
                    subscriber.onNext(links);
                }
                catch (IOException e)
                {
                    subscriber.onError(e);
                }
                subscriber.onCompleted();
            }
        });
    }
}

【问题讨论】:

    标签: system.reactive reactive-programming rx-java


    【解决方案1】:

    通过问题的“结合所有三个搜索的响应”部分,您可能正在寻找Zip

    Observable<Elements> search1 = search("RxJava");
    Observable<Elements> search2 = search("Reactive Extensions");
    Observable<Elements> search3 = search("Eric Meijer");
    Observable.zip(searc1, search2, search3,
                new Func3<Elements, Elements, Elements, Elements>() {
                    @Override
                    public Elements call(Elements result1, Elements result2, Elements result3) {
                        // Add all the results together...
                        return results;
                    }
                }
        ).subscribeOn(Schedulers.io()).subscribe(
                links -> {
                    links.forEach(link -> out.println(currentThreadName() + "\t" + link.text()));
                    latch.countDown();
                },
                e -> {
                    out.println(currentThreadName() + "\t" + "Failed: " + e.getMessage());
                    latch.countDown();
                }
        );
    

    这假设您想同时处理所有结果(在订阅者中,这里),而不关心给定结果使用了哪个查询。

    请注意,zip 函数有不同版本,从 1..N 个 observables 到 Func1Func9FuncN,允许您压缩特定或任意数量的 observables。

    【讨论】:

    • 嗨@AdamS,如何处理其中一个调用失败/抛出异常的情况?
    • 这样第二次调用会在第一次调用返回后发生。要解决此问题,请使用: Observable search1 = search("RxJava").subscribeOn(Schedulers.io()); Observable search2 = search("Reactive Extensions").subscribeOn(Schedulers.io()); Observable search3 = search("Eric Meijer").subscribeOn(Schedulers.io());
    【解决方案2】:

    这是处理整个过程(包括 Jsoup 调用)的另一种方法,避免了任何倒计时锁存器,并提供了一种避免使用 Observable.create 的方法(因为让 Rx 操作员处理所有订阅者要容易得多管理的东西!)

    (“餐巾纸”代码,可能需要一些轻推才能编译。)

    final String google = "http://www.google.com/search?q=";
    
    final String charset = "UTF-8";
    final String userAgent = "ExampleBot 1.0 (+http://example.com/bot)"; // ...
    Observable.just("RxJava", "Reactive Extensions", "Erik Meijer")
        .flatMap((query) -> Observable.defer(() -> {
            try {
                return Observable.from(Jsoup.connect(google + URLEncoder.encode(query, charset))
                    .timeout(1000)
                    .userAgent(userAgent)
                    .get()
                    .select("li.g>h3>a")).subscribeOn(Schedulers.io());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }))
        .forEach(
            (link) -> out.println(link.text()),
            (e) -> out.println("Failed: " + e.getMessage()));
    

    请注意,与您的原始示例一样,不能保证对此进行排序。一种处理方法是toSortedList,它要么期望实现Comparable 的项目的Observable,要么期望Func2 提供元素之间的比较。

    【讨论】:

    • 你为什么使用“defer()”以及Scheduler.io()?
    • 我打电话给defer 因为Jsoup.connect 似乎是一个阻塞电话。 defer 在订阅之前不会运行内部函数来创建Observable
    • 订阅io仅仅意味着抓取请求的工作将在Rx的线程池中完成,用于io操作,例如磁盘访问或网络调用。
    • 好的。所以如果你不调用 defer() 那么“Jsoup.connect()”会立即执行吗?如果是这样,这是否意味着所有 3 个搜索一个接一个地发生而不是同时发生?
    • 这取决于flatMap 的订阅工作在哪里完成,您可以对其进行调整。如果您希望事情明确地按顺序完成,concatMap 将是一个不错的选择(在此处直接替换 flatMap),它维护一个 SerialSubscription 并且一次只订阅一个传入的 observable。但是,如果在这些情况下没有 defer,处理传入的字符串以创建和缓冲 observable 以进行下一步将阻塞请求,因此 defer 实际上在 concatMap 情况下也很好。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-10-03
    • 1970-01-01
    • 1970-01-01
    • 2015-11-24
    • 2013-01-12
    • 2022-10-25
    • 1970-01-01
    相关资源
    最近更新 更多