【问题标题】:RxJava Observable to Completable, how to avoid toBlocking()RxJava Observable to Completable,如何避免toBlocking()
【发布时间】:2017-01-08 16:36:24
【问题描述】:

我目前在带有 Kotlin 的 Android 上使用 RxJava,但是我遇到了一个问题,如果不使用 toBlocking() 就无法解决。

我在员工服务中有一个返回 Observable 的方法>:

fun all(): Observable<List<Employee>>

这一切都很好,因为每当员工发生变化时,这个 Observable 都会发出新的员工列表。但是我想从员工那里生成一个 PDF 文件,这显然不需要在每次员工更改时都运行。另外,我想从我的 PDF 生成器方法返回一个 Completable 对象。我想在我的PDF中添加一个标题,然后遍历员工并计算每个员工的工资,这也返回一个Observable,这就是我现在使用toBlocking的地方。我目前的做法是这样的:

private fun generatePdf(outputStream: OutputStream): Completable {
    return employeeService.all().map { employees ->
        try {
                addHeaderToPDF()
                for (i in employees) {
                    val calculated = employeeService.calculateWage(i.id).toBlocking().first()
                    // Print calculated to PDF....
                }
                addFooterToPDF()
                return @map Completable.complete()
            }
            catch (e: Exception) {
                return @map Completable.error(e)
            }
        }.first().toCompletable()

有没有什么方法可以使用 RxJava 让这段代码更简洁一些?

提前致谢!

【问题讨论】:

  • 没有 for 循环,没有 forEach。基本上一切都必须是可观察的,包括addHeaderToPDFaddFooterToPDFcalculateWage 应该返回一个 Single,它要么返回一个值,要么返回错误。
  • calculateWage 是一个 Observable,因为它会在更改时发出重新计算的员工值。你能告诉我一个如何做到这一点的例子吗?我不能把拼图放在一起。 :(
  • ~很好,但是改变值将如何帮助您打印不可变的 PDF?在打印每个员工时,您需要一个工资值。〜理解,这是一个可观察的开始。我正在做一些简洁的事情,敬请期待......

标签: java android rx-java kotlin


【解决方案1】:

免责声明:此答案正在进行中。


基本前提:如果你在信息流中有blocking,那你就错了。

注意:任何状态都不能离开可观察的 lambda。

第 1 步:流式传输整个数据集

输入是员工流。对于每个员工,您需要获得一份工资。让我们把它变成一个流。

/**
 * @param employeesObservable
 * Stream of employees we're interested in.
 * @param wageProvider
 * Transformation function which takes an employee and returns a [Single] of their wage.
 * @return
 * Observable stream spitting individual [Pair]s of employees and their wages.
 */
fun getEmployeesAndWagesObservable(
        employeesObservable: Observable<Employee>,
        wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
    val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

    // Each Employee from the original stream will be converted
    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.
    // Remember, we need a stream and Single is a stream.
    employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
        // We need to get a source of wage value for current employee.
        // That source emits a single Int or errors.
        val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

        // Once the wage from said source is loaded...
        val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
            // ... construct a Pair<Employee, Int>
            employee to wage
        }

        // This code is not executed now. It will be executed for each Employee
        // after the original Observable<Employee> starts spitting out items.
        // After subscribing to the resulting observable.
        return@flatMapSingle employeeAndWageSingle
    }

    return employeesAndWagesObservable
}

订阅后会发生什么:

  1. 从源头获取员工。
  2. 获取员工的工资。
  3. 吐出一对员工和他们的工资。

这会一直重复,直到employeesObservable 发出onComplete 的信号,或者onError 出现故障。

使用的运算符:

  • flatMapSingle:将实际值转换为某个转换值的新单流。
  • map:将实际值转换为其他实际值(无嵌套流)。

这就是你将它连接到你的代码的方式:

fun doStuff() {
    val employeesObservable = employeeService.all()
    val wageProvider = Function<Employee, Single<Int>> { employee ->
        // Don't listen to changes. Take first wage and use that.
        employeeService.calculateWage(employee.id).firstOrError()
    }

    val employeesAndWagesObservable = 
            getEmployeesAndWagesObservable(employeesObservable, wageProvider)

    // Subscribe...
}

使用的运算符:

  • first:从 observable 中取出第一项并将其转换为单个流。
  • timeout:如果您通过网络获得工资,一个好主意是 .timeout

后续步骤

选项 1:到此结束

不要订阅,打电话

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

并以同步方式处理每个项目。坐下来,弄清楚接下来的步骤,观看演示,阅读示例。

选项 2:添加图层

  1. .map 每一个 Pair&lt;Employee, Int&gt; 到一些抽象的 PDF 构建块。
  2. 通过Observable.fromCallable { ... } 将您的页眉和页脚打印机转换为 Observables,让它们也返回 PDF 构建块。
  3. 通过Observable.concat(headerObs, employeeDataObs, footerObs)按顺序合并所有这些
  4. .subscribe 对此结果并开始将 PDF 构建块写入 PDF 编写器。
  5. 待办事项:
    • 想办法在订阅时懒惰地初始化 PDF 编写器(而不是在构建流之前),
    • 出错时删除输出,
    • 在完成或出错时关闭输出流。

【讨论】:

  • 很好,但是在这种情况下使用 observable 有什么好处?
  • @KarsenGauss 输入是一个可观察的 + 每个员工的另一个可观察的。在接受一周的 rx 后,只要我的想法允许,我就选择了 rx。这就是目标:不混合阻塞和非阻塞代码,尽可能长时间地流式传输。 如上所述,另一个步骤是将更多阻塞代码转换为 rx。
  • 你的回答很好,不要误会我的意思。我只是想强调,有时可能值得退后一步,考虑一种非反应性的方法。对于@szantogab 的问题,我看不到反应式解决方案有那么多好处。
  • @KarsenGauss 你和我都:D 但是有一种方法可以在此结束时流式传输到 PDF 文件(对于 syantogab)。并在此过程中(对我而言)在 rx 方面做得更好。
【解决方案2】:

我想出了这个:

    return employeeService.all().first()
            .doOnSubscribe { addHeaderToPDF() }
            .flatMapIterable { it }
            .flatMap { employeeService.calculateWage(it.id).first() }
            .doOnNext { printEmployeeWage(it) }
            .doOnCompleted { addFooterToPDF }
            .toCompletable()

这是应该的吗? :)

【讨论】:

  • 与此同时,我发现它不起作用,因为在 doOnNextdoOnCompleted 闭包中,我需要访问在 doOnSubscribe 闭包中创建的 PDF 文件。怎么做? :S
猜你喜欢
  • 1970-01-01
  • 2017-08-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-04
  • 2019-01-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多