【问题标题】:Applying an operation to all previously emitted items将操作应用于所有先前发出的项目
【发布时间】:2021-04-11 17:13:29
【问题描述】:

我有一个项目队列,一旦服务器可达,就会发送:
val queueHistory: Observable<QueuedItem>

QueuedItem 是:
data class QueuedItem(val item: Item, val sent: Boolean = false)

queueHistory 永远不会完成,它只记录一个项目何时排队等待发送onNext(QueuedItem(item1, false),然后再记录它被发送onNext(QueuedItem(item1, true)

我想要做的是获取当前有多少未发送项目的计数。

我的麻烦主要是由于列表未完成,我最初考虑使用collect,但需要一个完整的列表。

我正在玩弄 scan 之类的东西
queueHistory.scan { items: ScannedItems, item -> ScannedItems(arrayOf(*items, item), 0) }
我可以保留到目前为止我遇到的项目的当前列表,但 scan 希望所有内容都是相同的类型。

我的另一个想法是

queueHistory
            .groupBy { it.item }
            .flatMapSingle { it.toList() }
            .map { it.size % 2 }

但是 toList() 需要一个有限列表。

任何想法都将不胜感激!

【问题讨论】:

    标签: rx-java reactive-programming rx-kotlin


    【解决方案1】:

    有一个#scan 重载,它接受一个种子值和一个lambda,它接受两个参数(prev,current)。 prev 参数与种子类型相同,current 参数与上游类型相同。

    示例

    class So65587608 {
        @Test
        fun `65587608`() {
            val producer = PublishSubject.create<QueuedItem>()
    
            val map = producer.scan(mutableListOf<QueuedItem>(), { list, curr ->
                // when send = true -> remove
                if (curr.sent) {
                    list.removeIf { it.item == curr.item }
                } else if (!list.any { it.item == curr.item }) {
                    list.add(curr)
                }
                list.toMutableList()
            }).map { list -> list as List<QueuedItem> }
    
            map.subscribe {
                println(it)
            }
    
            val test = map.test()
    
            producer.onNext(QueuedItem("1"))
            producer.onNext(QueuedItem("1", true))
            producer.onNext(QueuedItem("2", true))
            producer.onNext(QueuedItem("3", true))
            producer.onNext(QueuedItem("4"))
            producer.onNext(QueuedItem("5"))
            producer.onNext(QueuedItem("4", true))
        }
    
        data class QueuedItem(val item: String, val sent: Boolean = false)
    }
    

    输出

    [] // seed value
    [QueuedItem(item=1, sent=false)]
    []
    []
    []
    [QueuedItem(item=4, sent=false)]
    [QueuedItem(item=4, sent=false), QueuedItem(item=5, sent=false)]
    [QueuedItem(item=5, sent=false)]
    

    注意

    您必须在每次#scan 迭代时复制列表,或者使用来自持久数据集合的不可变列表。

    此外,这可能不是一个好的实现,因为列表是未绑定的,这可能会占用您所有的内存。如果列表足够大,线性搜索也可能需要一些时间,这可能不是很好。人们应该考虑如何更好地完成循环。

    【讨论】:

    • 非常感谢您的见解!我也对列表的长度感到困扰,并且正在考虑使用某种缓存的主题/生产者,一旦我向用户展示了所有排队的项目都已发送,我不再需要当前的队列历史记录。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-05-28
    • 2016-03-28
    • 1970-01-01
    • 2019-11-19
    • 2013-11-26
    相关资源
    最近更新 更多