【问题标题】:Kotlin - Chunk sequence based on size and timeKotlin - 基于大小和时间的块序列
【发布时间】:2018-06-25 11:35:16
【问题描述】:

我有一个永无止境的流作为一个序列。 我的目标是根据时间和大小从序列中抽取一批。

我的意思是,如果我的序列现在有 2250 条消息,我想发送 3 批(1000、1000、250)。

另外,如果到下一个 5 分钟我还没有积累 1000 条消息,我无论如何都会发送到目前为止积累的任何信息。

        sequence
        .chunked(1000)
        .map { chunk ->
            // do something with chunk
        }

我期望拥有的是 .chunked(1000, 300) 之类的东西,当我想每 5 分钟发送一次时,300 是第二个。

提前致谢

【问题讨论】:

  • 我一直在搜索,但找不到任何相关问题。如果您知道任何相关的链接,将不胜感激。
  • Kotlin 序列通常不处理时间的概念,它们上没有一个运算符会延迟或任何类似的事情。这更多地属于协程领域,可能有类似的东西,或者您可以将自己的运算符实现为扩展函数,它可以满足您的需要,但您愿意。
  • 我很难相信我是第一个研究这个问题的人,但同时找不到合适的东西
  • 你肯定不是第一个 - Rx 可能已经涵盖了这一点,只是序列还不支持这些操作。
  • 谢谢。如果有适合我的协程解决方案。

标签: kotlin collections kotlin-coroutines


【解决方案1】:

Kotlin Sequence 是一个同步概念,不应该以任何有时间限制的方式使用。如果您向序列询问下一个元素,那么它会阻塞调用者线程,直到它产生下一个元素并且无法取消它。

但是,kotlinx.coroutines 库引入了Channel 的概念,它是异步世界的序列的粗略模拟,其中操作可能需要一些时间才能完成,并且在执行此操作时不会阻塞线程。你可以在this guide阅读更多内容。

它不提供现成的chunked 运算符,但可以很容易地编写一个。您可以使用以下代码:

import kotlinx.coroutines.experimental.channels.*
import kotlinx.coroutines.experimental.selects.*

fun <T> ReceiveChannel<T>.chunked(size: Int, time: Long) =
    produce<List<T>>(onCompletion = consumes()) {
        while (true) { // this loop goes over each chunk
            val chunk = mutableListOf<T>() // current chunk
            val ticker = ticker(time) // time-limit for this chunk
            try {
                whileSelect {
                    ticker.onReceive {
                        false  // done with chunk when timer ticks, takes priority over received elements
                    }
                    this@chunked.onReceive {
                        chunk += it
                        chunk.size < size // continue whileSelect if chunk is not full
                    }
                }
            } catch (e: ClosedReceiveChannelException) {
                return@produce // that is normal exception when the source channel is over -- just stop
            } finally {
                ticker.cancel() // release ticker (we don't need it anymore as we wait for the first tick only)
                if (chunk.isNotEmpty()) send(chunk) // send non-empty chunk on exit from whileSelect
            }
        }
    }

从这段代码中可以看出,它嵌入了一些关于在极端情况下应该做什么的重要决定。如果计时器到期但当前块仍然为空,我们该怎么办?此代码开始新的时间间隔并且不发送前一个(空)块。我们是在最后一个元素之后超时完成当前块,从第一个元素开始测量时间,还是从块开始测量时间?这段代码在后面。

这段代码完全是顺序的——它的逻辑很容易一步一步地遵循(代码内部没有并发)。可以根据任何特定项目的要求对其进行调整。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2021-12-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-14
相关资源
最近更新 更多