【问题标题】:Kotlin coroutines: How to use Array channel with filter / map?Kotlin 协程:如何将数组通道与过滤器/映射一起使用?
【发布时间】:2018-11-13 14:03:57
【问题描述】:

我需要在各个通道处理器之间插入一个包含 10000 个元素的缓冲区。

produce() 提供了一种配置缓冲区大小的方法:

produce(capacity = 10_000) {
}

但是mapfilter 默认为 Rendezvous 频道:

fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
    produce(context) { // No capacity specified, defaults to 0
        consumeEach {
            send(transform(it))
        }
    }

有没有办法配置它?目前我正在用缓冲区构建我自己的这些 stdlib 函数版本,这不是很优雅。

【问题讨论】:

  • 如果您使用Unconfined 以外的调度程序,我想知道map 甚至会如何与集合通道一起使用。
  • @MarkoTopolnik 为什么你认为它不起作用?
  • @qwwdfsad 后来我意识到它会,但很尴尬。发送协程将保持挂起状态,直到接收协程获取并处理该项目。
  • 澄清一下:发送协程是GlobalScope.produce,而不是向原始通道发送元素的那个。老实说,我不明白为什么热流很尴尬
  • @qwwdfsad 我自动假设这里的produce 只是一个独立的示例,实际工作将涉及通信协程。

标签: kotlin kotlinx.coroutines


【解决方案1】:

唯一的方法是使用capacity 参数提供您自己的地图实现:

fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
    produce(context, capacity = capacity) {
        consumeEach {
            send(transform(it))
        }
    }

我创建了一个https://github.com/Kotlin/kotlinx.coroutines/issues/841,但不会很快实现。当引入冷流以使热数据源和冷数据源之间的取消和范围层次结构保持一致时,所有通道运算符都可能被重新设计。

【讨论】:

    猜你喜欢
    • 2020-10-02
    • 2020-04-12
    • 1970-01-01
    • 1970-01-01
    • 2021-01-24
    • 2021-06-18
    • 2022-10-02
    • 2020-07-05
    • 2011-06-30
    相关资源
    最近更新 更多