【问题标题】:Iterator which asynchronously fetches the next batch from a service [kotlin coroutines]从服务中异步获取下一批的迭代器 [kotlin coroutines]
【发布时间】:2021-12-18 12:34:24
【问题描述】:

我已经实现了一个同步迭代器,它从外部服务获取批量数据(可能需要一些时间或挂起)。

我想在调用者代码迭代当前批次序列的条目时异步获取下一个批次。

理想情况下,我想使用 kotlin 协程

期望的过程:

  1. 迭代器获取第一批
  2. 迭代器开始在后台获取第二批
  3. 同时,调用者处理第一批
  4. 在处理完第一批之后,调用者可以立即处理第二批,这是由迭代器在后台获取的
  5. 迭代器开始在后台获取第三批
  6. 处理完第二批后,调用者可以立即处理后台迭代器获取的第三批
  7. 迭代器开始在后台获取第四批

我的同步实现:

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType>, AutoCloseable {
            val logging: Logging = options.service
            var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...))
            var i = 0
            var batch: List<LogEntry> = currentPage.values.toList()
            var isClosed = false

            override fun close() {
                logging.close()
                isClosed = true
            }

            override fun hasNext(): Boolean {
                if (isClosed) 
                    return false
                val hasNext = i < batch.size || currentPage.hasNextPage()
                if (!hasNext) {
                    logging.close()
                }
                return hasNext
            }

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (i == batch.size - 1 && currentPage.hasNextPage()) {
                    currentPage = currentPage.nextPage
                    batch = currentPage.values.toList()
                    i = 0
                }
                val logEntry = batch[i++]
                return logEntry.getPayload()
            }
        }.asSequence()
    }

Ofc 我对完全不同的解决方案持开放态度,只要它们可以包装在 kotlin 的序列中。

编辑

这是一个使用thread { } 的异步实现。我无法使用协程实现这一目标

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType> {
            private var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
            private var i = 0
            private var batch: List<LogEntry> = currentPage.values.toList()
            private var nextPage: Page<LogEntry>? = null
            private var job: Thread? = null

            override fun hasNext() = i < batch.size || currentPage.hasNextPage()

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (currentPage.hasNextPage()) {
                    if (nextPage == null && job == null) {
                        job = thread { nextPage = readNextPage(currentPage) }
                    }
                    if (i == batch.size) {
                        job!!.join()
                        currentPage = nextPage!!
                        job = thread { nextPage = readNextPage(currentPage) }
                        batch = currentPage.values.toList()
                        i = 0
                    }
                }
                val logEntry = batch[i++]
                return logEntry.getPayload<Payload<*>>()
            }

            private fun readNextPage(curPage: Page<LogEntry>): Page<LogEntry>? = curPage.nextPage

        }.asSequence()
    }

【问题讨论】:

  • 当然会阻塞,但如果提前下载批次,可能会阻塞更短的时间
  • 我添加了一个使用普通旧线程的实现
  • 我是否正确,您只能在前一页完全加载时才开始加载页面,这样您就有了可以调用readNextPage 的下一个 Page 对象?所以这个序列中没有并行性,但是迭代序列的线程可以在当前页面上工作,而序列正在预加载下一页?
  • 是的,这是正确的。

标签: kotlin asynchronous google-cloud-platform iterator java-stream


【解决方案1】:

请记住,由于您正在使用我不知道的所有类,因此我无法测试以下任何代码,因此可能会出现错误。

首先,使用sequence builder 可以大大简化您使用线程的代码:

// Not sure about how this should behave as you treat it like a blocking function.
// I return null when it's exhausted to simplify while loop.
private fun <T> readNextPageOrNull(page: Page<T>): Page<T>? = 
    if (page.hasNextPage()) page.nextPage!! else null

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var jobResult: Page<LogEntry>? = null
        var job = thread {
            jobResult = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            job.join()
            val page = jobResult ?: break
            job = thread { jobResult = readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

您可以通过使用CompletableFuture.supplyAsync {} 而不是thread {} 来进一步简化它并利用线程池而不是使用一堆新线程:

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var job = CompletableFuture.supplyAsync {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.join() ?: break
            job = CompletableFuture.supplyAsync { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

我们可以将其转换为使用协程,但是将协程代码转换为阻塞代码很尴尬。你必须使用runBlocking。这不会给您带来太多好处,但是如果您正在使用协程,它将使用您可能已经在使用的协程调度程序线程池。这里,coroutineScope 是当前类中合适的范围。

fun readStructuredLogs(): Sequence<Payload.JsonPayload> {
    return sequence {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = runBlocking { job.await() } ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

如果您已经在使用协程,您可能会考虑使用 Flow 而不是 Sequence,以便在您等待下一个项目时暂停而不是阻塞。使用 Flow 运算符可能有更简单的方法来做到这一点,但我只是对上面的序列代码进行了快速修改:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.await() ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            emitAll(page.values.asFlow())
        }
    }
}

编辑:使用buffer 执行此操作的可能方法:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var page = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        while (true) {
            emit(page)
            page = readNextPageOrNull(page) ?: break
        }
    }
        .flowOn(Dispatchers.IO)
        .buffer(1)
        .flatMapConcat { it.values.asFlow() }
}

【讨论】:

  • 哇,谢谢。我会尝试使用这些,看看单元测试是否通过并给你更多反馈。我完全忘记了产量/排放!
  • 是的,yieldAll 要简单得多。我更新了它。 ?: break?: continue?: return 模式是一个有用的 Kotlin 习惯用法,通常有助于避免在 if 或 else 块中嵌套一些代码。
  • 我以coroutineScope 为例。每当您使用协程时,您都必须有一些范围来启动协程。这是协同程序的一个关键概念,您应该在使用它们之前对其进行审查。如果您在 Android、Swing、JavaFX 等框架中工作,相关的 kotlinx 协程库会为您提供适当的协程范围。您还可以使用CoroutineScope(Dispatchers.IO)CoroutineScope(Dispatchers.Default)MainScope() 手动创建自己的范围并手动管理其生命周期,具体取决于您的使用方式。
  • 一个自创的协程作用域应该被存储在一个类的属性中,并且当关联的类达到它的生命结束时它应该被取消,这样任何当前运行的与该类关联的协程都将被取消。框架提供的范围已设置为已执行此操作。例如,在 Android 上,ViewModel 类提供viewModelScope,当 ViewModel 的生命结束时它会自动取消(所有关联的活动/片段都被销毁),因此它的协程被取消并停止在不会被后台任务上浪费资源用过。
  • 是的,我认为这很合适。但我认为您需要返回一次发出一页,然后链接.buffer(1),然后链接flatMapConcat { it.values.asFlow() }。否则,您将缓冲每个页面中的每个单独的值,因此预加载可能会在开始之前延迟。从好的方面来说,您不必处理延迟的工作或担​​心特定的协程范围。我将添加一个可能的示例。
猜你喜欢
  • 2020-01-15
  • 1970-01-01
  • 1970-01-01
  • 2016-07-02
  • 2014-10-28
  • 2016-01-21
  • 1970-01-01
  • 1970-01-01
  • 2014-02-04
相关资源
最近更新 更多