【发布时间】:2021-04-11 17:13:29
【问题描述】:
我有一个项目队列,一旦服务器可达,就会发送:val queueHistory: Observable<QueuedItem>
QueuedItem 是:data class QueuedItem(val item: Item, val sent: Boolean = false)
queueHistory 永远不会完成,它只记录一个项目何时排队等待发送onNext(QueuedItem(item1, false),然后再记录它被发送onNext(QueuedItem(item1, true)。
我想要做的是获取当前有多少未发送项目的计数。
我的麻烦主要是由于列表未完成,我最初考虑使用collect,但需要一个完整的列表。
我正在玩弄 scan 之类的东西queueHistory.scan { items: ScannedItems, item -> ScannedItems(arrayOf(*items, item), 0) }
我可以保留到目前为止我遇到的项目的当前列表,但 scan 希望所有内容都是相同的类型。
我的另一个想法是
queueHistory
.groupBy { it.item }
.flatMapSingle { it.toList() }
.map { it.size % 2 }
但是 toList() 需要一个有限列表。
任何想法都将不胜感激!
【问题讨论】:
标签: rx-java reactive-programming rx-kotlin