【问题标题】:How to use Kotlin coroutines with two for loops and channels that update each other?如何将 Kotlin 协程与两个相互更新的 for 循环和通道一起使用?
【发布时间】:2022-10-02 02:09:47
【问题描述】:

我正在尝试通过构建由this (gopl.io/ch8/crawl3) 建模的网络爬虫来了解有关 Kotlin 协程和通道的更多信息

这个想法是启动一组协程,它们循环通过链接通道访问linksToVisit,并将所有找到的链接推送到另一个通道foundLinks

然后一个单独的 for 循环遍历 foundLinks,检查它们是否已经被访问,如果没有,则将它们推回 linksToVisit 以供协程拾取。

到目前为止,我的代码似乎可以正确访问所有链接,但不会终止 - 一旦访问了所有链接,它就会挂起。我做错了什么,是否可以以这种方式使用 for 循环?

这是代码:

fun crawl(startUrl: String) = runBlocking(CoroutineScope(Dispatchers.IO).coroutineContext) {

    val linksToVisit = Channel<String>()
    launch { linksToVisit.send(startUrl) }
    val foundLinks = Channel<List<String>>()

    repeat(20) {
        launch {
            for (channel in linksToVisit) {
                val links = findLinks(channel)
                launch { foundLinks.send(links) }
            }
        }
    }

    val visitedLinks = mutableMapOf<String, Boolean>()

    for (links in foundLinks) {
        for (link in links) {
            if (!visitedLinks.contains(link)) {
                visitedLinks[link] = true
                linksToVisit.send(link)
            }
        }
    }
}

其中findLinks(channel) 检索页面(使用JSoup)并返回找到的链接列表。

附带问题:JSoup 与协程兼容吗?

    标签: kotlin kotlin-coroutines channel


    【解决方案1】:

    一旦没有更多链接需要处理,您需要在频道上致电close()。一旦通道关闭并接收到所有剩余元素,for 循环将完成。

    【讨论】:

    • 感谢您的答复!我尝试添加close(),但似乎无法破解正确的位置/条件 - 我要么遇到异常,要么它仍然挂起。我怎么知道什么时候没有更多的链接需要处理?假设我还需要关闭 linksToVisit? (如果有一个 for 循环和通道似乎更明显,但是有两个让我感到困惑,因为它们都相互馈送 - 就像一个 catch 22!)
    【解决方案2】:

    对于爬虫用例,使用close() 是不可能的。您需要一种启发式方法:如果您的页面通道为空,则定义您愿意等待的超时。

    这是一个非常原始的实现模板:

        private val lastAction = AtomicLong(System.currentTimeMillis())
        private val pageChannel = Channel<String>(Channel.UNLIMITED)
    
        fun startCrawl(root: String) {
            runBlocking<Unit> {
                GlobalScope.launch(Dispatchers.Default) { 
                    pageChannel.send(root) 
                }
                var current = System.currentTimeMillis()
                do {
                    GlobalScope.launch(Dispatchers.Default) {
                        crawl()
                    }
                    if (pageChannel.isEmpty) delay(1000)
                    current = System.currentTimeMillis()
                } while (current - lastAction.get() <= 5000)
            }
        }
    
        suspend fun crawl() {
            val page = pageChannel.receive()
            logger.info("Processing ${page}")
            lastAction.set(System.currentTimeMillis())
            // TODO ...
            // if you find find more pages you send them to pageChannel
        }
    

    您可以在krawler 中找到更多详细信息,这是一个用基于协程的 kotlin 编写的小型爬虫库。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-10-02
      • 1970-01-01
      • 2020-04-12
      • 2023-03-27
      • 1970-01-01
      • 2021-06-18
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多