【问题标题】:Correctly implementing wait and notify in Kotlin在 Kotlin 中正确实现等待和通知
【发布时间】:2017-06-16 12:49:04
【问题描述】:

根据本文档,不鼓励在 Kotlin 中使用 waitnotifyhttps://kotlinlang.org/docs/reference/java-interop.html

等待()/通知()

Effective Java Item 69 建议优先使用并发实用程序而不是 wait() 和 notify()。因此,这些方法不适用于 Any 类型的引用。

但是该文档没有提出任何正确的方法。

基本上,我想实现一个服务,它会读取输入数据并处理它们。如果没有输入数据,它将暂停自己,直到有人通知有新的输入数据。类似的东西

while (true) {
    val data = fetchData()
    processData(data)
    if (data.isEmpty()) {
        wait()
    }
}

编辑:

我不想使用这些不推荐的方法(反模式),我真的很想知道如何正确地做到这一点。

在我的情况下,fetchData 从数据库中读取数据,因此在我的情况下无法使用队列。

【问题讨论】:

  • 你检查了 Effective Java Item 69 吗?
  • 您可以使用 Kotlin 协程中的演员来实现您的服务。它等待项目被发送到通道。更多信息在这里:github.com/Kotlin/kotlinx.coroutines/blob/master/…
  • 您可以将任何对象转换为java.lang.Object 并轻松实现此类反模式。
  • 目前协程中的actor 已被弃用,所以这也不是一个好的选择。

标签: kotlin


【解决方案1】:

一般来说,您应该尽可能使用更高级别的并发实用程序。

但是,如果在您的情况下没有更高级别的构造起作用,则直接 替换是使用 ReentrantLock 和单 Condition 在那把锁上。

例如,如果您的 Java 代码类似于:

private Object lock = new Object();

...

synchronized(lock) {
    ...
    lock.wait();
    ...
    lock.notify();
    ...
    lock.notifyAll();
    ...
}

您可以将其更改为以下 Kotlin:

private val lock = ReentrantLock()
private val condition = lock.newCondition()

lock.withLock {           // like synchronized(lock)
    ...
    condition.await()     // like wait()
    ...
    condition.signal()    // like notify()
    ...
    condition.signalAll() // like notifyAll()
    ...
}

虽然这有点冗长,但条件确实提供了一些额外的 灵活性,因为您可以在单个锁上具有多个条件,并且 还有其他类型的锁(特别是ReentrantReadWriteLock.ReadLockReentrantReadWriteLock.WriteLock)。

请注意,withLock 是 Kotlin 提供的扩展函数,负责在调用提供的 lambda 之前/之后调用 Lock.lock()/Lock.unlock()

【讨论】:

  • Kotlin 还添加了 withLock 更高级别的函数,这使得它变得更好!
  • @JonTirsen 谢谢!不知何故,我错过了标准库中withLock 的存在(我发誓我会寻找这样的东西)。我已经更新了使用它的答案。
  • 您仍然必须使用变量boolean 来检查线程是否正在等待,所以对我来说最好使用synchronized + Object
  • @user924 什么意思?
  • wait()/notify()ReeantrantLock/Condition 与协程一起使用是没有意义的。这两种方法都会阻塞线程,这在 Couroutines 中是不可接受的。
【解决方案2】:

BlockingQueue 可以是适合您用例的高级并发实用程序,但应用它需要了解和修改您的代码结构。

这个想法是,fetchData() 应该 .take() 队列中的一个项目,如果队列为空,则会阻止执行,直到出现项目,这消除了代码中的 .wait()。数据的生产者应该.put(t) 将数据放入队列。


如果您真的需要使用waitnotify,例如为了在低级别实现并发实用程序,您可以将 Kotlin 对象强制转换为 java.lang.Object 并在之后调用这些函数,如 the language reference 中所述。或者,写成扩展函数:

@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
private fun Any.wait() = (this as java.lang.Object).wait()

【讨论】:

  • Ad BlockingQueue,在我的情况下这不起作用,因为收到通知时,我需要检查数据库中的数据。会有什么有用的东西吗?广告notify,我真的不需要,我只需要找到合适的:-)
  • @Vojtěch 也许是Semaphore
  • 创建专用的val lock=Object(),而不是演员和@Suppress,并在其上使用任何synchronized(lock){ lock.wait(); lock.notify()}。这没有编译器警告。
【解决方案3】:

您可以使用kotlin Semaphore 来暂停协程而不是阻塞线程:

val semaphore = Semaphore(1, 0)

suspend fun runFetchLoop() = withContext(Dispatchers.Default){
    while (isActive) {
        val data = fetchData()
        processData(data)
        semaphore.acquire()
        if (data.isEmpty()) {
            semaphore.acquire()
        }
        semaphore.release()
    }
}

fun notifyDataAvailable() = semaphore.release()

这是一个运行示例:https://pl.kotl.in/bnKeOspfs

但是我更喜欢 cold Flowhot Channel 来解决这个问题。这是一个很好的article,关于伟大的 Roman Elizarov 的冷流和热通道

【讨论】:

  • 我认为这个答案中的例子是错误的。有一个竞争条件。我假设,notify()fetchData() 之后被调用,返回的东西与以前不同。如果在data.isEmpty() 为真之后,但在mutex.lock() 之前调用notify(),则等待mutex.lock() 的协程将永远等待。
  • @PavelČernocký 你是对的。为了解决这个问题,我用一个信号量和两个获取调用替换了互斥锁。这将条件和暂停链接在一起。如果 notify 调用介于 isEmpty 条件和 acquire 调用之间,则不会再导致不必要的无限暂停。
  • 此修复存在其他问题。 acquire()/release() 必须成对出现,这里不能保证。尤其是当notifyDataAvailable()被多次调用时,会抛出异常,因为不会有Semaphore的许可释放。总的来说,我认为这种方法并不正确,当然不能一概而论。整个while 的构造很糟糕。 data 是由fetchData 填充的局部变量,所以fetchData 应该与notifyDataAvailable 协调。
猜你喜欢
  • 1970-01-01
  • 2018-02-04
  • 1970-01-01
  • 2018-02-13
  • 2015-04-01
  • 2018-11-12
  • 2015-11-12
  • 1970-01-01
  • 2011-10-30
相关资源
最近更新 更多