【问题标题】:RxJava - Observables: One Observable than return other ObservableRxJava - Observables:一个 Observable 比返回另一个 Observable
【发布时间】:2016-10-20 13:41:04
【问题描述】:

我正在一个应用程序中工作,而不是将一个对象列表转换为其他对象,为此,我使用 Rx Java 的 Observable。

我有两种方法,一种是具有异步服务的适配器(Jersey 外观),此方法从另一个服务消耗一个 Observable。

我需要消耗一个 Observable 并在完成后处理每个项目

为了处理每个项目,我使用了一个 flatmap 运算符,但我不知道如何创建一个新的观察者,它具有一种类型,例如一个由 flatmap 运算符处理的所有每个项目的列表。

有什么想法吗?

谢谢

更新:

这段代码,处理每个元素并返回另一个 Observable,但我不知道这是否做得很好。

@Override
public Observable<ArrayList> getGeoJson2() {
    WKTReader2 reader2 = new WKTReader2();
    WKBReader wkbReader = new WKBReader();
    ArrayList featureCollection = new ArrayList();

    Subject<ArrayList,ArrayList> subject = PublishSubject.create();

    manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
        @Override
        public SimpleFeature call(Manzana manzana) {
            try {
                SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
                return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
            }catch (Exception e){
                System.out.println(e.getMessage());
                return null;
            }

        }
    }).subscribe(new Subscriber<SimpleFeature>() {
        @Override
        public void onCompleted() {
            subject.onNext(featureCollection);
            subject.onCompleted();
        }

        @Override
        public void onError(Throwable throwable) {
            subject.onError(throwable);
        }

        @Override
        public void onNext(SimpleFeature simpleFeature) {
            featureCollection.add(simpleFeature);
        }
    });

    return subject;
}

而这段代码是使用 Observable 返回者的:

@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {

    Observable<ArrayList> features = service.getGeoJson2();

    features.subscribe(new Observer<ArrayList>() {
        @Override
        public void onCompleted() {

            System.out.println("Se completo la accion!!!");
        }

        @Override
        public void onError(Throwable throwable) {

            System.out.println(throwable.getMessage());
        }

        @Override
        public void onNext(ArrayList features) {
            asyncResponse.resume(features);
        }
    });
}

永远不会调用 onNext() 方法!!!

谢谢

【问题讨论】:

  • 请提供一个简短的代码示例,说明您到目前为止所尝试的内容。
  • 你能不能也展示一下这个方法返回什么? manzanaRepository.getManzanas()
  • 可观察的

标签: rx-java


【解决方案1】:

尝试用这个替换你的getGeoJson2()

@Override
public Observable<List<SimpleFeature>> getGeoJson2() {
    return manzanaRepository.getManzanas()
            .map(new Func1<Manzana, SimpleFeature>() {
                @Override
                public SimpleFeature call(Manzana manzana) {
                    try {
                        SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
                        return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null));
                    } catch (Exception e) {
                        System.out.println(e.getMessage());
                        return null;
                    }
                }
            })
            .filter(new Func1<SimpleFeature, Boolean>() {
                @Override
                public Boolean call(SimpleFeature sf) {
                    return sf != null;
                }
            })
            .toList();
}

解释: 使用toList() 操作符,它等待来自源observable 的onCompleted,然后将源observable 发出的所有项目作为列表发出。

【讨论】:

  • 感谢您的回复,但如果我想返回一个 Observable ,其中 X 不是一个列表,而是由每个已处理的项目以及其他信息组成。如何创建新的观察者
  • 我想你会在toList 之后使用map 和/或flatMap。我需要查看更多代码才能给出更好的答案:D
【解决方案2】:

您的代码的问题是您正在使用 PublishSubject,它在您开始使用之前完成

如果您将其更改为 BehaviorSubject,它将起作用

然而,更好(而且更短)如下实现它 -

manzanaRepository.getManzanas().map(/**/).collect(/*create array*/, /*add next item*/)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-11-21
    • 2020-10-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-05
    • 2015-02-05
    相关资源
    最近更新 更多