【问题标题】:Manage identical requests with RxJava使用 RxJava 管理相同的请求
【发布时间】:2015-04-16 18:16:29
【问题描述】:

假设我有一个从单独线程上的给定链接获取图像的提取器。然后图像将被缓存在内存中。一旦图像已经被缓存,获取器将不会重新获取链接。 fetcher 被认为是一个 Observable。可能有很多订阅者向获取者索要图片。在第一个订阅者订阅 fetcher 后,fetcher 将拍摄网络。但是,如果有第二个订阅者来订阅,那么获取器不应该在它之前已经获取一个请求时再发送另一个请求。之后,如果获取完成,两个订阅者都将获得图像。现在,如果有第三个订阅者来了,提取器将立即发出图像。

如何使用 RxJava 方法实现上述场景?我期望的是利用某种现有的运算符,以更具声明性的方式组合它们,最重要的是,避免同步、锁定和原子的开销。

【问题讨论】:

  • 这个场景听起来像图片加载库。在 fetcher 下载图像后,该图像将作为键值对缓存在内存中,其中 url 为键。因此,其他想要获取相同图像的提取器将使用给定的 url 检查地图。通过条件检查(如 null 或不完整),它会或不会启动新的 http 请求。至于发射,除非您想保留图像,否则它是按需进行的 UI 操作。如果 ImageView 没有需要图像的需求,则不加载。
  • 另外,我会使用像 Picasso 这样的图像加载库并让它处理缓存(我认为仅内存缓存图像不会让你走得太远,因为 Android 设备的内存有限)。如果您愿意,可以使用 Picasso 的 Targets 将 Picasso 包裹在 Observable 中。

标签: android reactive-programming rx-java


【解决方案1】:

线索在问题中:“假设我有一个获取器,它从单独线程上的给定链接获取图像。然后图像将缓存在内存中。”

答案是cache() 运算符:

“记住 Observable 发出的项目的序列,并将相同的序列发送给未来的订阅者”

来自:https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators

所以,下面的Observable 应该只获取一次图片,不管Subscribers 如何订阅它:

Observable<Bitmap> cachedBitmap = fetchBitmapFrom(url).cache();

编辑:

我认为下面的例子证明了上游Observable 只被订阅了一次,即使在Observable 发出任何东西之前有多个订阅进入。对于网络请求也应该如此。

package com.example;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class SimpleCacheTest {

    public static void main(String[] args) {
        final Observable<Integer> cachedSomething = getSomething().cache();

        System.out.println("before first subscription");
        cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("1"));

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("before second subscription");
        cachedSomething.subscribe(new SimpleLoggingSubscriber<Integer>("2"));

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("quit");
    }

    private static class SimpleLoggingSubscriber<T> extends Subscriber<T> {

        private final String tag;

        public SimpleLoggingSubscriber(final String tag) {
            this.tag = tag;
        }
        @Override
        public void onCompleted() {
            System.out.println("onCompleted (" + tag + ")");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError (" + tag + ")");
        }

        @Override
        public void onNext(T t) {
            System.out.println("onNext (" + tag + "): " + t);
        }
    }

    private static Observable<Integer> getSomething() {
        return Observable.create(new Observable.OnSubscribe<Integer>(){

            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("going to sleep now...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io());
    }
}

输出:

before first subscription
going to sleep now...
before second subscription
onNext (1): 1
onNext (2): 1
onCompleted (1)
onCompleted (2)
quit

【讨论】:

  • 这不会过滤掉正在进行的请求 - 如果您的第二个图像请求在第一个请求返回之前出现。事实上,它根本不会阻止请求多次运行——这个解决方案假设 url->cache 映射在其他地方完成。
  • 实际上,我认为这不是真的 - 请看一下我添加的示例代码。还是我缺少什么?
  • 好的,我想我们看待问题的方式不同。如果您已经有一个您知道为 URL 提供图像的 Observable(订阅时),则此方法有效。但是,如果您只知道需要为 URL 获取图像(并且您想抽象该图像的获取和缓存),那么 Rx 可能不是一个好的解决方案——尤其是在 Android 上。
  • 再次重新阅读问题,很难说 - 如果您为图像构建可观察对象然后保留对它的引用(如您的示例中),了解您'将始终具有相同的 URL。如果 URL 发生变化,那么您的 Observable 也会发生变化,这就会分崩离析——您必须为不同的 URL 缓存对 Observables 的引用,每个 URL 本身都在缓存图像……哎呀。
  • @Adam S:是的,你是对的 - 使用 cache() 确实需要将所有(缓存的)Observables 保存在 Map&lt;Url, Observable&lt;Bitmap&gt;&gt; 或类似的东西中。
【解决方案2】:

这可以通过 ConcurrentMap 和 AsyncSubject 来完成:

import java.awt.image.BufferedImage;
import java.io.*;
import java.net.URL;
import java.util.concurrent.*;

import javax.imageio.ImageIO;

import rx.*;
import rx.Scheduler.Worker;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;


public class ObservableImageCache {
    final ConcurrentMap<String, AsyncSubject<BufferedImage>> image = 
        new ConcurrentHashMap<>();
    public Observable<BufferedImage> get(String url) {
        AsyncSubject<BufferedImage> result = image.get(url);
        if (result == null) {
            result = AsyncSubject.create();
            AsyncSubject<BufferedImage> existing = image.putIfAbsent(url, result);
            if (existing == null) {
                System.out.println("Debug: Downloading " + url);
                AsyncSubject<BufferedImage> a = result;
                Worker w = Schedulers.io().createWorker();
                w.schedule(() -> {
                    try {
                        Thread.sleep(500); // for demo
                        URL u = new URL(url);

                        try (InputStream openStream = u.openStream()) {
                            a.onNext(ImageIO.read(openStream));
                        }
                        a.onCompleted();
                    } catch (IOException | InterruptedException ex) {
                        a.onError(ex);
                    } finally {
                        w.unsubscribe();
                    }
                });
            } else {
                result = existing;
            }
        }
        return result;
    }
    public static void main(String[] args) throws Exception {
        ObservableImageCache cache = new ObservableImageCache();
        CountDownLatch cdl = new CountDownLatch(4);

        Observable<BufferedImage> img1 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
        System.out.println("Subscribing for IMG1");
        img1.subscribe(e -> System.out.println("IMG1: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
        Thread.sleep(500);
        Observable<BufferedImage> img2 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png");
        System.out.println("Subscribing for IMG2");
        img2.subscribe(e -> System.out.println("IMG2: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);

        Observable<BufferedImage> img3 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");
        Observable<BufferedImage> img4 = cache.get("https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png");

        Thread.sleep(500);

        System.out.println("Subscribing for IMG3");
        img3.subscribe(e -> System.out.println("IMG3: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);
        Thread.sleep(1000);
        System.out.println("-> Should be immediate: ");
        System.out.println("Subscribing for IMG4");
        img4.subscribe(e -> System.out.println("IMG4: " + e.getWidth() + "x" + e.getHeight()), Throwable::printStackTrace, cdl::countDown);

        cdl.await();
    }
}

我正在使用 ConcurrentMap 的 putIfAbsent 来确保新 url 只触发一次下载;其他所有人都将收到相同的 AsyncSubject,他们可以“等待”并在可用后立即获取数据。通常,您希望通过使用自定义调度程序来限制并发下载的数量。

【讨论】:

  • 谢谢,我一直不太了解 AsyncSubject 的用例;该文档并不像它可能的那样清楚。特别是,我没有意识到您可以在源 Observable 完成后订阅 AsyncSubject 并仍然获得源 Observable 的最后一次发射。 reactivex.io/RxJava/javadoc/rx/subjects/AsyncSubject.html 的弹珠图和示例代码没有显示此内容。
【解决方案3】:

看看ConnectableObservable.replay() 方法。

我目前正在使用这是我的片段来处理方向变化:

Fragment 的 onCreate:

ConnectableObservable<MyThing> connectableObservable = 
    retrofitService.fetchMyThing()
        .map(...)
        .replay();

connectableObservable.connect(); // this starts the actual network call

Fragment 的 onCreateView:

Subscription subscription = connectableObservable
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(mything -> dosomething());

发生的情况是我只发出 1 个网络请求,并且任何订阅者都会(最终/立即)获得该响应。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-18
    • 2020-12-07
    • 1970-01-01
    • 2018-03-27
    • 1970-01-01
    • 2018-06-20
    • 1970-01-01
    相关资源
    最近更新 更多