【问题标题】:RxJava valve use caseRxJava 阀门用例
【发布时间】:2020-08-05 03:22:40
【问题描述】:

RxJava 中是否有操作符、外部库或我缺少的方法来创建可接收控制数据发射的函数(如阀门)的可流动/可观察的?

我有一个巨大的 json 文件需要处理,但我必须获取文件的一部分、实体列表、处理它然后获取另一部分,我尝试使用 windows()、buffer() 但BiFunction 我传递给 Flowable.generate() 在我收到第一个列表后继续执行并且我还没有完成处理它。我还尝试了 hu.akarnokd.rxjava3.operators 中的 FlowableTransformers.valve() 但它只是在处理列表的 flatMap() 函数之前堆积了项目

private Flowable<T> flowable(InputStream inputStream) {

    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        final var token = jsonParser.nextToken();

        if (token == null) {
            emitter.onComplete();
        }

        if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
            return jsonParser;
        }

        if (JsonToken.START_OBJECT.equals(token)) {
            emitter.onNext(reader.readValue(jsonParser));
        }

        return jsonParser;
    }, JsonParser::close);
}

编辑:我需要控制项目的排放,以免内存和处理数据的函数过载,因为该函数读取和写入数据库,也需要按顺序处理。处理数据的函数并不完全是我的,它是用 RxJava 编写的,预计我会使用 Rx。

我设法像这样解决它,但如果有其他方法请告诉我:

public static <T> Flowable<T> flowable(InputStream inputStream, JsonFactory jsonFactory, ObjectReader reader, Supplier<Boolean> booleanSupplier) {
    return Flowable.generate(() -> jsonFactory.createParser(new GZIPInputStream(inputStream)), (jsonParser, emitter) -> {

        if (booleanSupplier.get()) {
            final var token = jsonParser.nextToken();

            if (token == null) {
                emitter.onComplete();
            }

            if (JsonToken.START_ARRAY.equals(token) || JsonToken.END_ARRAY.equals(token)) {
                return jsonParser;
            }

            if (JsonToken.START_OBJECT.equals(token)) {
                emitter.onNext(reader.readValue(jsonParser));
            }

        }
        
        return jsonParser;
    }, JsonParser::close);
}

Edit2:这是我目前使用该功能的方式之一

public Flowable<List<T>> paging(Function<List<T>, Single<List<T>>> function) {
    final var atomicInteger = new AtomicInteger(0);
    final var atomicBoolean = new AtomicBoolean(true);

    return flowable(inputStream, jsonFactory, reader, atomicBoolean::get)
            .buffer(pageSize)
            .flatMapSingle(list -> {

                final var counter = atomicInteger.addAndGet(1);

                if (counter == numberOfPages) {
                    atomicBoolean.set(false);
                }

                return function.apply(list)
                        .doFinally(() -> {
                            if (atomicInteger.get() == numberOfPages) {
                                atomicInteger.set(0);
                                atomicBoolean.set(true);
                            }
                        });
            });
}

【问题讨论】:

  • 您要解决什么问题(参见 xyproblem.info)?你绝对需要使用 RxJava,例如出于学术目的?该线程提供了一些使用 Java 流式传输大型 JSON 文件的解决方案,但不使用 RxJava:stackoverflow.com/questions/9390368/…
  • @DV82XL 我在问题中添加了更多信息,我需要 RxJava,因为这是 RxJava 链的一小部分
  • 感谢您澄清问题。
  • Generate 将调用 lambda 的次数与已有请求的次数相同。之后你如何处理 json 值?许多运营商都使用预取或 capacityHint 参数进行重载,以限制未完成的请求数量。
  • @akarnokd 我用一个例子更新了帖子,json 值被处理并保存到数据库中,据我所知,这个操作非常昂贵,所以我需要一种方法来控制多少数据我正在发送

标签: java reactive-programming rx-java2 rx-java3


【解决方案1】:

设法像这样解决它

 public static Flowable<Object> flowable(JsonParser jsonParser, ObjectReader reader, PublishProcessor<Boolean> valve) {
    return Flowable.defer(() -> {
        final var token = jsonParser.nextToken();

        if (token == null) {

            return Completable.fromAction(jsonParser::close)
                    .doOnError(Throwable::printStackTrace)
                    .onErrorComplete()
                    .andThen(Flowable.empty());
        }


        if (JsonToken.START_OBJECT.equals(token)) {
            final var value = reader.readValue(jsonParser);
            final var just = Flowable.just(value).compose(FlowableTransformers.valve(valve, true));
            return Flowable.concat(just, flowable(jsonParser, reader, valve));
        }


        return flowable(jsonParser, reader, valve);
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-12-22
    • 1970-01-01
    • 2015-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-13
    相关资源
    最近更新 更多