【发布时间】:2021-12-18 12:34:24
【问题描述】:
我已经实现了一个同步迭代器,它从外部服务获取批量数据(可能需要一些时间或挂起)。
我想在调用者代码迭代当前批次序列的条目时异步获取下一个批次。
理想情况下,我想使用 kotlin 协程。
期望的过程:
- 迭代器获取第一批
- 迭代器开始在后台获取第二批
- 同时,调用者处理第一批
- 在处理完第一批之后,调用者可以立即处理第二批,这是由迭代器在后台获取的
- 迭代器开始在后台获取第三批
- 处理完第二批后,调用者可以立即处理后台迭代器获取的第三批
- 迭代器开始在后台获取第四批
- 等
我的同步实现:
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return object : Iterator<LogPayloadType>, AutoCloseable {
val logging: Logging = options.service
var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...))
var i = 0
var batch: List<LogEntry> = currentPage.values.toList()
var isClosed = false
override fun close() {
logging.close()
isClosed = true
}
override fun hasNext(): Boolean {
if (isClosed)
return false
val hasNext = i < batch.size || currentPage.hasNextPage()
if (!hasNext) {
logging.close()
}
return hasNext
}
override fun next(): LogPayloadType {
if (!hasNext()) {
throw NoSuchElementException()
}
if (i == batch.size - 1 && currentPage.hasNextPage()) {
currentPage = currentPage.nextPage
batch = currentPage.values.toList()
i = 0
}
val logEntry = batch[i++]
return logEntry.getPayload()
}
}.asSequence()
}
Ofc 我对完全不同的解决方案持开放态度,只要它们可以包装在 kotlin 的序列中。
编辑
这是一个使用thread { } 的异步实现。我无法使用协程实现这一目标
fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
return object : Iterator<LogPayloadType> {
private var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
private var i = 0
private var batch: List<LogEntry> = currentPage.values.toList()
private var nextPage: Page<LogEntry>? = null
private var job: Thread? = null
override fun hasNext() = i < batch.size || currentPage.hasNextPage()
override fun next(): LogPayloadType {
if (!hasNext()) {
throw NoSuchElementException()
}
if (currentPage.hasNextPage()) {
if (nextPage == null && job == null) {
job = thread { nextPage = readNextPage(currentPage) }
}
if (i == batch.size) {
job!!.join()
currentPage = nextPage!!
job = thread { nextPage = readNextPage(currentPage) }
batch = currentPage.values.toList()
i = 0
}
}
val logEntry = batch[i++]
return logEntry.getPayload<Payload<*>>()
}
private fun readNextPage(curPage: Page<LogEntry>): Page<LogEntry>? = curPage.nextPage
}.asSequence()
}
【问题讨论】:
-
当然会阻塞,但如果提前下载批次,可能会阻塞更短的时间
-
我添加了一个使用普通旧线程的实现
-
我是否正确,您只能在前一页完全加载时才开始加载页面,这样您就有了可以调用
readNextPage的下一个 Page 对象?所以这个序列中没有并行性,但是迭代序列的线程可以在当前页面上工作,而序列正在预加载下一页? -
是的,这是正确的。
标签: kotlin asynchronous google-cloud-platform iterator java-stream