【问题标题】:How to wait for a flow to complete emitting the values如何等待流程完成发出值
【发布时间】:2021-05-04 09:31:23
【问题描述】:

我的存储库中有一个函数“getUser”,它根据提供的 id 发出一个代表用户的对象。

flow function

fun getUser(id: String) = callbackFlow {
    val collectionReference: CollectionReference =
        FirebaseFirestore.getInstance().collection(COLLECTION_USERS)

    val query: Query = collectionReference.whereEqualTo(ID, id)

    query.get().addOnSuccessListener {
        val lst = it.toObjects(User::class.java)
        if (lst.isEmpty())
            offer(null)
        else
            offer(it.toObjects(User::class.java)[0])
    }

    awaitClose()
}

我在另一个类中需要这些值。我遍历一个 id 列表,并将收集到的用户添加到一个新列表中。我如何在收集值时等待列表完成,然后再调用 return?

collector function

private fun computeAttendeesList(reminder: Reminder): ArrayList<User> {
    val attendeesList = arrayListOf<User>()
    for (friend in reminder.usersToShare) {
        repoScope.launch {
            Repository.getUser(friend).collect {
                it?.let { user ->
                    if (!attendeesList.contains(user))
                        attendeesList.add(user)
                }
            }
        }
    }
    return attendeesList
}

我不想使用实时数据,因为这不是与 UI 相关的类。

【问题讨论】:

  • 请不要发布屏幕截图,它们不可搜索,并且很难建立有用的响应,因为我们需要输入所有内容。请在您的问题中使用代码块粘贴代码。
  • 您希望您的computeAttendeesList 在等待时做什么?你想让它暂停吗?还是阻塞当前线程?如果是前者,你应该使用suspend自己的方法,如果是后者,你可以使用runBlocking。无论哪种情况,您都有多种方法可以做到这一点,因此这将有助于获得有关您要在哪个范围内运行以及要阻止哪些线程(如果有的话)的更多信息。
  • 对不起@Joffrey,我发布了代码。好吧,我需要是一个阻塞代码,只有在attendeesList被计算之后,我想被返回以便被另一个函数使用。
  • 另外,你的repoScope 是单线程的吗?因为您正在同时更新此处不是线程安全的 ArrayList。
  • 如果你想让它提供一个完成的列表,那么有一个suspend fun getList() : List&lt;T&gt; 并从一个挂起的上下文中调用它。我认为您正在尝试用一种解决方案解决两个问题。该流程很好地发出构造列表所需的值,但如果您需要一个完成的列表,感兴趣的对象应该简单地接收这样的东西。

标签: kotlin


【解决方案1】:

这段代码中有多个问题需要解决:

  1. getUser() 旨在返回一个 User,但它目前返回一个 Flow&lt;User&gt; 这永远不会结束,并且永远不会返回一个以上的用户。
  2. 从多个并发查询构造用户列表的方式不是线程安全的(因为在多线程 IO 调度器上执行多个launches,它们都直接更新同一个不安全列表)
  3. 实际用例是从 Firebase 获取用户列表,但使用单个 ID 的多个查询而不是单个查询

#1 的解决方案

让我们先解决#1。这是getUser() 的一个版本,它暂停单个User 而不是返回Flow

suspend fun getUser(id: String): User {
    val collectionReference = FirebaseFirestore.getInstance().collection(COLLECTION_USERS)
    val query = collectionReference.whereEqualTo(ID, id)

    return query.get().await().let { it.toObjects(User::class.java) }.firstOrNull()
}

// use the kotlinx-coroutines-play-services library instead
private suspend fun <T> Task<T>.await(): T {
    return suspendCancellableCoroutine { cont ->
        addOnCompleteListener {
            val e = exception
            if (e == null) {
                @Suppress("UNCHECKED_CAST")
                if (isCanceled) cont.cancel() else cont.resume(result as T)
            } else {
                cont.resumeWithException(e)
            }
        }
    }
}

事实证明,这个await() 函数已经被编写(以更好的方式)并且在kotlinx-coroutines-play-services library 中可用,因此您实际上不需要自己编写。

#2 的解决方案

如果我们不能按照#3重写整个事情,我们可以这样处理问题#2:

private suspend fun computeAttendeesList(reminder: Reminder): List<User> {
    return reminder.usersToShare
        .map { friendId -> 
            repoScope.async { Repository.getUser(friendId) }
        }
        .map { it.await() }
        .toList()
}

#3 的解决方案

相反,我们可以直接向 Firebase 查询整个列表:

suspend fun getUsers(ids: List<String>): List<User> {
    val collectionReference = FirebaseFirestore.getInstance().collection(COLLECTION_USERS)
    val query = collectionReference.whereIn(ID, ids)

    return query.get().await().let { it.toObjects(User::class.java) }
}

然后以非常基本的方式消费它:

private suspend fun computeAttendeesList(reminder: Reminder): List<User> {
    return Repository.getUsers(reminder.usersToShare)
}

或者,您可以使此函数阻塞(删除suspend)并将您的调用包装在runBlocking 中(如果您确实需要阻塞当前线程)。

请注意,此解决方案未强制执行任何调度程序,因此如果您需要特定范围或调度程序,可以使用 withContext 包装其中一个挂起函数调用。

【讨论】:

  • 是的,谢谢,#3 是最好的方法,我刚刚更改了它并且它有效。祝你有美好的一天!
猜你喜欢
  • 2023-04-07
  • 2018-06-21
  • 2016-10-16
  • 2012-03-16
  • 2010-12-07
  • 1970-01-01
  • 2013-03-22
  • 2010-11-06
相关资源
最近更新 更多