【问题标题】:Is there any better approach to set a Listener n times inside Coroutines?有没有更好的方法在 Coroutines 中设置一个 Listener n 次?
【发布时间】:2020-05-19 08:42:41
【问题描述】:

在我的项目中,我根据 List 的大小在 for 循环中调用 Firebase 数据库 n 次,并通过获取每个侦听器的响应来返回结果。我在 Coroutine 中完成所有这些工作。这是我的代码

 fun Flow<List<Basket>>.mapBasketListToItemsList() : Flow<List<Items>> = map{it : List<Basket> ->
     val itemsList = Collections.synchronizedList(ArrayList<Items>())

     coroutineScope{
         it.forEach {it : Basket ->
            launch {
                databaseReference.child("Items").child(it.itemsId!!)
                    .addListenerForSingleValueEvent(object : ValueEventListener {
                       override fun onCancelled(p0: DatabaseError) {

                       }

                       override fun onDataChange(dataSnapshot: DataSnapshot) {
                           if (dataSnapshot.exists()) {
                               val item = dataSnapshot.getValue(Items::class.java)
                               itemsList.add(item!!)
                           }
                       }
                   })
           }
       }
   }
    itemsList
}

但我感觉很奇怪,因为我不知道我这样做是否正确。所以我想知道是否有更好的方法来完成这项工作。

【问题讨论】:

  • 您好 Mattwalk,欢迎来到 SO。你有没有看过协程和延续?这是一个提到 Firebase 的问题,也许它会给你一个指向那个方向的指针? (它仍然是 PITA,顺便说一句)github.com/Kotlin/kotlinx.coroutines/issues/48
  • 好的,我看看
  • @AnoopM 好吧...

标签: android multithreading firebase kotlin coroutine


【解决方案1】:

您不能使用此类操作通过 suspendCoroutine 之类的函数传递异步值流,以免我们在第二次尝试继续继续时得到 IllegalStateException,因为 Kotlin 暂停和继续是单次的。

Flow 被明确设计为表示多个值的冷异步流。您可以使用callbackFlow 函数将多镜头回调转换为流:

使用示例:

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback { // implementation of some callback interface
        override fun onNextValue(value: T) {
            // To avoid blocking you can configure channel capacity using
            // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
            try {
                sendBlocking(value)
            } catch (e: Exception) {
                // Handle exception from the channel: failure in flow or premature closing
            }
        }
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("API Error", cause))
        }
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    /*
     * Suspends until either 'onCompleted'/'onApiError' from the callback is invoked
     * or flow collector is cancelled (e.g. by 'take(1)' or because a collector's coroutine was cancelled).
     * In both cases, callback will be properly unregistered.
     */
        awaitClose { api.unregister(callback) }
    }

Read More

【讨论】:

猜你喜欢
  • 2011-04-13
  • 1970-01-01
  • 2020-11-28
  • 2011-04-04
  • 1970-01-01
  • 2020-04-21
  • 2023-03-16
  • 2020-06-21
  • 1970-01-01
相关资源
最近更新 更多