【问题标题】:How to compose Observables to avoid the given nested and dependent callbacks?如何组合 Observables 以避免给定的嵌套和依赖回调?
【发布时间】:2015-04-08 18:08:45
【问题描述】:

this blog 中,他给出了回调地狱的this(复制/粘贴以下代码)示例。但是,没有提到如何通过使用响应式扩展来消除该问题。

所以这里 F3 取决于 F1 完成,而 F4 和 F5 取决于 F2 完成。

  1. 想知道 Rx 中的等效功能是什么。
  2. 如何在 Rx 中表示 F1、F2、F3、F4 和 F5 都应该异步拉取?

注意:我目前正试图围绕 Rx 进行思考,所以在问这个问题之前我没有尝试解决这个例子。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class CallbackB {

    /**
     * Demonstration of nested callbacks which then need to composes their responses together.
     * <p>
     * Various different approaches for composition can be done but eventually they end up relying upon
     * synchronization techniques such as the CountDownLatch used here or converge on callback design
     * changes similar to <a href="https://github.com/Netflix/RxJava">Rx</a>.
     */
    public static void run() throws Exception {
        final ExecutorService executor = new ThreadPoolExecutor(4, 4, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
        /* the following are used to synchronize and compose the asynchronous callbacks */
        final CountDownLatch latch = new CountDownLatch(3);
        final AtomicReference<String> f3Value = new AtomicReference<String>();
        final AtomicReference<Integer> f4Value = new AtomicReference<Integer>();
        final AtomicReference<Integer> f5Value = new AtomicReference<Integer>();

        try {
            // get f3 with dependent result from f1
            executor.execute(new CallToRemoteServiceA(new Callback<String>() {

                @Override
                public void call(String f1) {
                    executor.execute(new CallToRemoteServiceC(new Callback<String>() {

                        @Override
                        public void call(String f3) {
                            // we have f1 and f3 now need to compose with others
                            System.out.println("intermediate callback: " + f3 + " => " + ("f4 * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f3Value.set(f3);
                            latch.countDown();
                        }

                    }, f1));
                }

            }));

            // get f4/f5 after dependency f2 completes 
            executor.execute(new CallToRemoteServiceB(new Callback<Integer>() {

                @Override
                public void call(Integer f2) {
                    executor.execute(new CallToRemoteServiceD(new Callback<Integer>() {

                        @Override
                        public void call(Integer f4) {
                            // we have f2 and f4 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + (f4 + " * f5"));
                            // set to thread-safe variable accessible by external scope 
                            f4Value.set(f4);
                            latch.countDown();
                        }

                    }, f2));
                    executor.execute(new CallToRemoteServiceE(new Callback<Integer>() {

                        @Override
                        public void call(Integer f5) {
                            // we have f2 and f5 now need to compose with others
                            System.out.println("intermediate callback: f3" + " => " + ("f4 * " + f5));
                            // set to thread-safe variable accessible by external scope 
                            f5Value.set(f5);
                            latch.countDown();
                        }

                    }, f2));
                }

            }));

            /* we must wait for all callbacks to complete */
            latch.await();
            System.out.println(f3Value.get() + " => " + (f4Value.get() * f5Value.get()));
        } finally {
            executor.shutdownNow();
        }
    }

    public static void main(String[] args) {
        try {
            run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static final class CallToRemoteServiceA implements Runnable {

        private final Callback<String> callback;

        private CallToRemoteServiceA(Callback<String> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseA");
        }
    }

    private static final class CallToRemoteServiceB implements Runnable {

        private final Callback<Integer> callback;

        private CallToRemoteServiceB(Callback<Integer> callback) {
            this.callback = callback;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(100);
        }
    }

    private static final class CallToRemoteServiceC implements Runnable {

        private final Callback<String> callback;
        private final String dependencyFromA;

        private CallToRemoteServiceC(Callback<String> callback, String dependencyFromA) {
            this.callback = callback;
            this.dependencyFromA = dependencyFromA;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call("responseB_" + dependencyFromA);
        }
    }

    private static final class CallToRemoteServiceD implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceD(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(140);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(40 + dependencyFromB);
        }
    }

    private static final class CallToRemoteServiceE implements Runnable {

        private final Callback<Integer> callback;
        private final Integer dependencyFromB;

        private CallToRemoteServiceE(Callback<Integer> callback, Integer dependencyFromB) {
            this.callback = callback;
            this.dependencyFromB = dependencyFromB;
        }

        @Override
        public void run() {
            // simulate fetching data from remote service
            try {
                Thread.sleep(55);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            callback.call(5000 + dependencyFromB);
        }
    }

    private static interface Callback<T> {
        public void call(T value);
    }
}

【问题讨论】:

  • 我不是 Rx 大师,但来自他提供的一个会话:您可以通过编写 Observables 来避免回调地狱。如果@benjchristensen 在附近-他也许可以提供更多详细信息。
  • @alfasin 我对 RxJava 的了解足够多,知道应该通过组合 observables 来解决它。这里的问题是如何组成:-)
  • 在这种情况下 - 我会将问题从“如何避免回调地狱?”到“如何组成 observables?” ;)
  • @alfasin 重新措辞了这个问题……你有答案吗?
  • @EntryLevelDev 是的。改变了它。谢谢。

标签: java future rx-java


【解决方案1】:

我是所引用的有关回调和 Java 期货的博文的原作者。下面是一个使用 flatMap、zip 和 merge 异步进行服务组合的示例。

它获取一个 User 对象,然后同时获取 Social 和 PersonalizedCatalog 数据,然后对于 PersonalizedCatalog 中的每个视频,同时获取一个书签、评级和元数据,将它们压缩在一起,并将所有响应合并为一个渐进式流输出作为服务器- 发送事件。

return getUser(userId).flatMap(user -> {
    Observable<Map<String, Object>> catalog = getPersonalizedCatalog(user)
            .flatMap(catalogList -> catalogList.videos().<Map<String, Object>> flatMap(
                    video -> {
                        Observable<Bookmark> bookmark = getBookmark(video);
                        Observable<Rating> rating = getRatings(video);
                        Observable<VideoMetadata> metadata = getVideoMetadata(video);
                        return Observable.zip(bookmark, rating, metadata, (b, r, m) -> combineVideoData(video, b, r, m));
                    }));

    Observable<Map<String, Object>> social = getSocial(user).map(s -> {
        return s.getDataAsMap();
    });

    return Observable.merge(catalog, social);
}).flatMap(data -> {
    String json = SimpleJson.mapToJson(data);
    return response.writeStringAndFlush("data: " + json + "\n");
});

这个例子可以在https://github.com/Netflix/ReactiveLab/blob/952362b89a4d4115ae0eecf0e73f273ecb27ba98/reactive-lab-gateway/src/main/java/io/reactivex/lab/gateway/routes/RouteForDeviceHome.java#L33的一个正常运行的应用程序的上下文中看到

由于我无法在此处提供所有信息,您还可以在 https://speakerdeck.com/benjchristensen/reactive-streams-with-rx-at-javaone-2014?slide=32 找到演示文稿形式的说明(带有视频链接)。

【讨论】:

  • 谢谢。我认为 getBookmark(...) 等实现在内部使用 subscribeOn(Schedulers.io()) 对吗?而且,zip 操作在调用线程上运行。
  • 是的,他们可以使用 subscribeOn 和 IO 调度程序来使阻塞 IO 异步。在 Netflix,我们通常使用 Hystrix 为我们处理这些问题,并提供舱壁、超时、回退、指标等。如果 IO 是非阻塞的,例如通过 Netty,那么 subscribeOn 是不必要的。
【解决方案2】:

根据您的代码。假设远程调用是使用Observable 完成的。

 Observable<Integer>  callRemoveServiceA()  { /* async call */  }

/* .... */

Observable<Integer>  callRemoveServiceE(Integer f2) { /* async call */  }

你想要什么:

  • 调用serviceA,然后调用serviceB,结果为serviceA
  • 调用serviceC,然后调用serviceDserviceE,结果为serviceC
  • 利用serviceEserviceD 的结果,构建一个新值
  • serviceB 的结果显示新值

使用 RxJava,您将通过以下代码实现:

Observable<Integer> f3 = callRemoveServiceA() // call serviceA
            // call serviceB with the result of serviceA
            .flatMap((f1) -> callRemoveServiceB(f1)); 


Observable<Integer> f4Andf5 = callRemoveServiceC() // call serviceC
                    // call serviceD and serviceE then build a new value
                    .flatMap((f2) -> callRemoveServiceD(f2).zipWith(callRemoveServiceE(f2), (f4, f5) -> f4 * f5));

// compute the string to display from f3, and the f4, f5 pair
f3.zipWith(f4Andf5, (childF3, childF4Andf5) -> childF3 + " => " + childF4Andf5)
            // display the value
            .subscribe(System.out::println);

这里重要的部分是flapMapzip(或zipWith)的使用

你可以在这里获得更多关于flapMap的信息:When do you use map vs flatMap in RxJava?

【讨论】:

  • R有了这个,我可以假设,一旦订阅,f4Andf5 可以比 f3 更快地执行。提供服务 A 和 B 需要更长的时间?我问是因为有了期货,服务 A 和 B 都应该在其他服务运行之前返回。
  • 是的,如果每次调用远程服务都是异步的。
猜你喜欢
  • 1970-01-01
  • 2020-09-28
  • 2020-03-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多