【问题标题】:exception in vert.x worker threadvert.x 工作线程中的异常
【发布时间】:2020-06-22 11:00:50
【问题描述】:

我对 vert.x 平台非常陌生。我的项目中有一个标准和一个工人 verticle,它通过 eventBus 进行通信。 Worker Verticle 在循环和数据库访问中执行多个 REST API 调用。

我的问题是工人 Verticle 在某些运行中没有问题地完成任务,但有时它会抛出以下错误。

Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed

我正在使用 kotlin 协程来处理 constructDevice(vertx: Vertx) 函数,该函数执行大部分 REST API 调用和数据库访问。

谁能告诉我上述问题的原因是什么以及有什么方法可以改进constructDevice(vertx: Vertx)函数以有效地处理多个REST API调用和MongoDB访问。

    // worker verticle to handle multiple REST API calls and MongoDB database access
    
    class DeviceDiscoverVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val consumer = vertx.eventBus().localConsumer<String>("listDevice")
            consumer.handler { message ->
                CoroutineScope(vertx.dispatcher()).launch {
                    constructDevice(vertx)
                }
                message.reply("discovered")
            }
        }
    }
    
    // standard verticle to communicate with worker verticle 
    
    class ListDeviceVerticle : CoroutineVerticle() {
        override suspend fun start() {
            val reply = awaitResult<Message<String>> { h ->
                vertx.eventBus().request("listDevice", "deviceIPs", h)
            }
            println("Reply received: ${reply.body()}")
        }
    }
    
    fun main() {
        val vertx = Vertx.vertx()
        val workOption = DeploymentOptions().setWorker(true)
        vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
        vertx.deployVerticle(ListDeviceVerticle())
    }


    suspend fun constructDevice(vertx: Vertx) {
        val deviceRepository = listOf(
            "10.22.0.106",
            "10.22.0.120",
            "10.22.0.115",
            "10.22.0.112"
        )
    
        val webClient = WebClient.create(vertx)
        val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
        val mongoClient: MongoClient = MongoClient.create(vertx, config)
        val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
        
        // loop through the IP list and calls REST endpoints
        
        val deviceList = deviceRepository.map { deviceIP ->
            val deviceIPconfig: DeviceIPconfig
            val deviceType: DeviceType
            val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
            val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
    
            val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceIP.send(handler)
            }
            deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
                json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
            } else {
                println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
                DeviceIPconfig()
            }
            
            val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
                requestDeviceType.send(handler)
            }
            if (responseDeviceType.statusCode() == 200) {
                deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
                val device = DeviceModel(deviceIPconfig, deviceType)
                json {
                    obj(
                        "_id" to deviceIPconfig.localMac,
                        "device" to json.stringify(DeviceModel.serializer(), device)
                    )
                }
            } else {
                println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
                jsonObjectOf()
            }
    
        }.filterNot { it.isEmpty }
        
        // construct data to upload in mongoDB
        
        val activeDeviceIDs = json {
            obj("_id" to "activeDeviceIDs",
                "activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
        }
        val activeDevices = json {
            obj("_id" to "activeDevices",
                "activeDevices" to json { array(deviceList) }
            )
        }
        
        // save the data in MongoDB
        
        mongoClient.save("devices", activeDeviceIDs) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
        mongoClient.save("devices", activeDevices) { res ->
            if (res.succeeded()) {
                println("saved successfully")
            } else {
                res.cause().printStackTrace()
            }
        }
    }

更新问题:1

@Damian 我已根据您的输入更新了我的问题。为了便于理解,我已经简化了上面的问题,但是当我尝试使用 promise/future 来实现这些事情时,我在某些时候遇到了困难。

我的任务是从不同的 REST 端点获取数据并从中获取 kotlin 类,我希望并行处理。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            // this will return Json array and each element of that array needs to be called again in a loop.
            val result = asyncResult.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            deviceDevicesPromise.complete(result)
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {

    val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
    // I need to call other rest points similar to this and I need map the result to kotlin class.

   // how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>. 

}

class DeviceDiscoverVerticle : AbstractVerticle() {
        override fun start() {
            val deviceRepository = // list of IP strings
    
            val webClient = WebClient.create(vertx)
            vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
                deviceRepository.forEach { deviceIP ->
                    val futureList = constructDevice(vertx, webClient, deviceIP)
                    CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                            if (allFuturesResult.succeeded()) {
                                // how to handle individual future result here to construct data
                            } else {
                                println("failed")
                            }
                        }
                }
            }
        }

更新问题:2

@Damian 正如你所说,我已经更新了我的代码。

fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed")
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
    val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        }
        else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}


fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
    val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
    val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if(ar.succeeded()) {
            val futureList = ar.result().bodyAsJsonArray().map { device ->
                constructDeviceDevice(deviceIP, device.toString(), webClient)
            }
            CompositeFuture.all(futureList).onComplete { asyncResult ->
                if (asyncResult.succeeded()) {
                    asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
                        //not all future in futureList are completed here some of them shows Future{unresolved}
                    }
                    constructDevicePromise.complete(DeviceFlow(label = "xyz"))
                }
                else {
                    constructDevicePromise.fail("failed")
                }
            }

        }
    }
    return constructDevicePromise.future()
}


class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = //list of IPs

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                val constructDeviceFuture = constructDevice(webClient, deviceIP)
                constructDeviceFuture.onComplete {ar ->
                    //println(ar.result().toString())
                }
            }
        }
    }
}

我的问题在里面

CompositeFuture.all(futureList).onComplete { asyncResult ->
                        if (asyncResult.succeeded()) {
                            asyncResult.result().list<HttpResponse<Buffer>>().forEach {

这里大部分future都没有解决,执行在这里挂起。

[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

所以我根据 vert.x 文档将 CompositeFuture.all(futureList).onComplete 更改为 CompositeFuture.join(futureList).onComplete,加入将等待所有未来完成

连接组合等待直到所有期货都完成,要么 成功或失败。 CompositeFuture.join 需要几个 期货参数(最多 6 个)并返回成功的未来 当所有期货都成功和失败当所有期货 已完成,其中至少有一个失败

但现在很少有期货会失败。这是更改为CompositeFuture.join后未来列表的输出

CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
                            if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
                                asyncResult.result().list<HttpResponse<Buffer>>().forEach {



[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
    [Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]

很少有期货因为我的设备无法处理并发请求而失败?还有为什么程序执行会卡在里面

asyncResult.result().list<HttpResponse<Buffer>>().forEach { 

如果设备并发请求处理存在问题,那么解决此问题的其他方法是什么。是否可以在 vertx 环境之外运行整个 rest 调用并通过事件总线与其通信?

此外,如果我将DeviceDiscoverVerticle 部署为标准verticle 而不是worker verticle,则应用程序将完全卡在CompositeFuture.all(futureList).onComplete

【问题讨论】:

  • 为什么要将它们部署为 worker=true?它旨在用于阻塞代码,vert.x 在异步操作中大放异彩(mongo 客户端和通过 vertx 的 WebClient 进行的 http 调用是异步操作的示例),阅读有关事件循环概念和反应性的更多信息
  • 如果我没有将它部署为工作者,那么它总是会抛出事件循环线程错误

标签: kotlin vert.x vertx-verticle vert.x-webclient


【解决方案1】:

了解更多您想要实现的目标,首先在方法 constructDeviceDevices() 中,我会将返回类型更改为 Future&lt;HttpResponse&lt;Buffer&gt;&gt;,如果成功则调用 deviceDevicesPromise.complete(asyncResult.result())

然后,在constructDevice() 方法中,我将调用我们修改后的constructDeviceDevices() 方法并从中获取未来对象,我们称之为Future&lt;HttpResponse&lt;Buffer&gt;&gt; httpDevicesFuture。下一步是在这个处理程序中调用httpDevicesFuture.onComplete(ar -&gt; {&lt;handler code&gt;}),您可以访问ar.result(),这是来自“.../devices/”端点的响应,所以现在在同一个块中,我将遍历该响应并获取List&lt;Future&lt;HttpResponse&lt;Buffer&gt;&gt;&gt;。仍然呆在同一个块中,我会写CompositeFuture.all(futuresList).onComplete(ar -&gt; handler) 这个ar 将是CompositeFuture 类型它有一个方法list() 实际上返回完成期货的列表(并且在这个处理程序中它们都完成了)所以现在使用该列表您可以为每个未来检索HttpResponse&lt;Buffer&gt;,每个都将是您的“.../devices/$device”响应,您可以将它们映射到您想要的任何对象。现在在同一个处理程序中,我将决定下一步我想去哪里,我可能会通过在 eventBus 上发送一条消息来做到这一点,比如eventBus.send("HTTP_PROCESSING_DONE", serializedDevices),或者如果出现问题eventBus.send("HTTP_FAILURE", someMessage)。但是在您的情况下,如果您想为某个列表中的每个 IP 执行所有这些操作,而不是强制它同步然后仍然在同一个块中,您可以进行任何对象映射并调用 constructDeviceFuture.complete(mappedObject/List&lt;MappedObject&gt;) 这意味着您必须再创造一个未来,你将从constructDevice()方法返回

基本上你被卡住了,因为你试图在异步世界中重现顺序执行,特别是当你试图从 constructDevice() 方法返回一个值时,这意味着我们实际上想要等待所有执行完成在处理这行代码时,在 vert.x 中并非如此。

它看起来像那样(语法可能是关闭的,所以将其视为伪代码)

    fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
    val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()

    requestDevices.send { asyncResult ->
        if (asyncResult.succeeded())
            deviceDevicePromise.complete(asyncResult.result())
        else
            deviceDevicePromise.fail("Http request failed");
    }
    return deviceDevicePromise.future()
}

fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
    val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
    val deviceDevicesPromise: Future<HttpResponse<Buffer>> = Promise.promise()

    requestDeviceDevices.send { asyncResult ->
        if (asyncResult.succeeded()) {
            deviceDevicesPromise.complete(asyncResult.result())
        } else
            deviceDevicesPromise.fail("Http request failed")
    }
    return deviceDevicesPromise.future()
}

fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): Future<SomeDomainObject> {

    //Type of below promise depends on what you are mapping responses to. It may also be a list of mapped objects
    val constructDevicePromise: Promise<SomeDomainObject> = Promise.promise()
    val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)

    httpDevicesFuture.onComplete { ar ->
        if (ar.succeeded()) {
            val futureList: List<Future<HttpResponse<Buffer>>>
            //loop through ar.result() and populate deviceDevicesFuture list

            CompositeFuture.all(futureList).onComplete { allFuturesResult ->
                if (allFuturesResult.succeeded()) {
                    // here you have access to allFuturesResult.list() method
                    // at this point you know all futures have finished, you can retrieve result from them (you may need to cast them from Object)
                    // when you have List<HttpResponse> you map it to whatever you want
                    val myMappedObject: SomeDomainObject = mappingResult()
                    constructDevicePromise.complete(myMappedObject)
                } else {
                    constructDevicePromise.fail("failed")
                }
            }
        }
    }

    return constructDevicePromise.future()
}

class DeviceDiscoverVerticle : AbstractVerticle() {
    override fun start() {
        val deviceRepository = // list of IP strings

        val webClient = WebClient.create(vertx)
        vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
            deviceRepository.forEach { deviceIP ->
                //here dependent on your logic, you handle each future alone or create a list and handle them together
                val constructDeviceFuture: Future<SomeDomainObject> = constructDevice(vertx, webClient, deviceIP)
                constructDeviceFuture.onComplete(ar -> {
                    ar.result() // <- this is your mapped object
                    eventBus.send("SOME_NEXT_LOGIC", serializedDomainObject)
                })
            }
            
            //if you need to handle all devices at once, once again you need to make CompositeFuture from all responses of constructDevice
        }
    }
}

更新 2 响应

关于CompositeFuture.all(): 你错过了一件事情,CompositeFuture.all()waits until all futures succeeds OR at least one failed. 如果即使有一个失败了,它也不会等待其他人(这就像逻辑 AND,不需要等待其余的,因为我们已经知道结果了)。另一方面,CompositeFuture.join() 只是等待所有期货完成,但如果其中任何一个失败,则由此产生的未来也将失败(但您至少应该得到所有期货的结果)。

这实际上就是您在输出中看到的内容,使用 CompositeFuture.all() 您会得到一堆已完成的 Future,一个失败,其余的未解决。

这部分还缺少一件事:

vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
        deviceRepository.forEach { deviceIP ->
            val constructDeviceFuture = constructDevice(webClient, deviceIP)
            constructDeviceFuture.onComplete {ar ->
                //println(ar.result().toString())
            }
        }
    }

你没有检查ar.succeeded(),如果你会看到最终的未来实际上是失败的,这就是为什么最终的结果不是你所期望的。

现在只是纯粹猜测正在发生的事情。您可能会(在某种程度上)杀死(在某种程度上)这个其余的 API(我假设每个 vertx 事件都是相同的 API)有这么多并发请求,如果您在单个请求处理程序中放置一些毫秒精度的日志消息,您可能应该看到请求是相隔几毫秒。我想 API 能够处理很少的请求,然后下一个请求由于某些异常/阻塞/超时或其他原因而失败,而所有其他请求可能根本没有得到响应,或者正在等待它们达到某个超时。如果您将 Verticle 定义为标准,则当任何内容持续超过两秒时,您将收到警告,此外,有一个线程处理所有这些内容,因此如果一个请求挂起很长时间,标准 Verticle 将完全没有响应那时。这可能是你被困在CompositeFuture.join() 方法的原因。

所以现在你可以做几件事:

  1. 您可以将并发执行更改为sequential execution。基本上,不是预先创建n 期货,而是为单个元素创建一个期货,然后调用future.compose(ar -&gt; {}) 这个处理程序只有在未来完成时才会被调用。然后在同一个处理程序中创建并返回下一个元素的未来。实现 imo 有点棘手,但可行(我已经使用 java stream reduce 将 x future 减少为单个)。当您以这种方式实现时,您一次将有一个请求,因此 API 应该没有问题。请注意,仍然会同时处理不同的 IP,但每个 IP 的请求将是连续的,因此它可能工作得很好。

  2. 您可以创建另一个标准 Verticle,它只响应单个事件,该事件将调用“/devices/$device”端点。现在在您现在拥有的代码中,当您循环访问初始 http 响应时,您只需向 eventBus 发送 20 个事件,而不是产生 20 个更多的 HTTP 请求。当您只有一个处理该特定消息的 Verticle 实例,并且它是一个只有一个线程的标准 Verticle 时,实际上此时应该只处理一条消息并且应该排队。这也非常容易调整,因为您可以增加 Verticle 实例的数量,并且您将拥有与 Verticle 实例数量一样多的并发请求。

  3. 您提到完全在 vertx 之外处理它,我认为这根本没有必要,但如果您认为它最适合您,那么它非常简单。如果您已经拥有来自 somwhere 的 Vertx 对象,则将该对象传递给其他类的构造函数是没有问题的。在那里,您可以拥有自己的 http 客户端、您自己的方法,基本上是您想要的任何东西,并且在某些时候,当您决定要使用 vert.x 时,您可以调用 vertx.eventBus().send() 并触发一些将由 vert 处理的逻辑。X。要记住的最重要的事情是不要创建多个Vertx 对象实例,因为它们将具有单独的事件总线。实际上正如文档所述

垂直 ... 此模型完全是可选的,如果您不想这样做,Vert.x 不会强迫您以这种方式创建应用程序。

所以你可以让你在任何框架中编写常规应用程序,并且在某些时候仍然只是实例化 Vertx 对象,执行单个任务,然后回到你的基本框架,但老实说,我认为你非常接近解决这个问题 :)

【讨论】:

  • 感谢您抽出宝贵时间回答@Damian,但我的任务仍然存在问题。我已经更新了我的问题。请看一看。
【解决方案2】:

我不熟悉 kotlin 和协程,但我可能对 vert.x 本身有一些建议。首先根据documentation

在大多数情况下,应在应用程序启动时创建一次 Web 客户端,然后重复使用。否则你会失去很多好处,比如连接池,如果实例没有正确关闭,可能会泄漏资源。

我看到你在constructDevice方法中调用了Webclient.create(vertx),所以你每次发送'listDevice'事件时都会创建新的WebClient,所以你可以考虑改变它。

我最近有类似的事情要做,最后使用了Futures。请注意,当您调用 awaitResult 时,您正在阻塞线程以等待异步执行,如果这是标准 Verticle,您确实会收到阻塞线程警告的垃圾邮件。你可以做的是创建一个promise,在你的http处理程序中完成/失败,在处理程序之外你只需返回promise.future()对象。在循环之外你可以处理所有的futures,不同的是futures处理也是异步的,所以你不会阻塞线程。

此外,为了使代码更简洁并利用 vert.x 的异步特性,最好将 http 和 mongo 处理拆分为单独的 verticle i。 e.

  1. HttpVerticle 获取 listDevice 事件
  2. HttpVerticle 为 5 个不同的请求创建 5 个未来
  3. 当所有期货完成时,future.onComplete()/compositeFuture.all() 被触发并发送“updateDB”事件
  4. MongoVerticle 接收并处理“updateDB”事件

这里可能没有解决您的具体问题,但我希望它能引导您至少更进一步

在评论之后这是一个 java 中的期货示例

public class HttpVerticle extends AbstractVerticle {

WebClient webClient;

@Override
public void start() throws Exception {

    webClient = WebClient.create(vertx);

    vertx.eventBus().consumer("run_multiple_requests", event -> {
        //When event is received this block is handled by some thread from worker pool, let's call it 'main thread'
        Promise<HttpResponse<Buffer>> request1Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request2Promise = Promise.promise();
        Promise<HttpResponse<Buffer>> request3Promise = Promise.promise();

        //Since webclient is async, all calls will be asynchronous
        webClient.get("ip1", "/endpoint")
                .send(asyncResult -> {
                    //async block #1 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request1Promise.complete(asyncResult.result());
                    } else {
                        request1Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 is probably still processing
        webClient.get("ip2", "/endpoint")
                .send(asyncResult -> {
                    //async block #2 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request2Promise.complete(asyncResult.result());
                    } else {
                        request2Promise.fail("Http request failed");
                    }
                });

        //at this point async block #1 and #2 are probably still processing
        webClient.get("ip3", "/endpoint")
                .send(asyncResult -> {
                    //async block #3 if it's worker verticle, it's probably picked up by another thread
                    //here we specify that our promise finished or failed
                    if (asyncResult.succeeded()) {
                        request3Promise.complete(asyncResult.result());
                    } else {
                        request3Promise.fail("Http request failed");
                    }
                });

        //retrieving futures from promises
        Future<HttpResponse<Buffer>> future1 = request1Promise.future();
        Future<HttpResponse<Buffer>> future2 = request2Promise.future();
        Future<HttpResponse<Buffer>> future3 = request3Promise.future();

       
        CompositeFuture.all(future1, future2, future3).onComplete(allFuturesResult -> {
            //async block #4 this will be executed only when all futures complete, but since it's async it does
            // not block our 'main thread'
            if (allFuturesResult.succeeded()) {
                //all requests succeeded
                vertx.eventBus().send("update_mongo", someMessage);
            } else {
                //some of the requests failed, handle it here
            }
        });
        
        //at this point async block #1 #2 #3 are probably still processing and #4 is waiting for callback
        //but we leave our event handler and free 'main thread' without waiting for anything
    });
}

当然,这段代码可以(并且应该)更短,为了清晰起见,所有这些都是硬编码的,没有任何数组和循环

如果您使用 logback 或 log4j(其他可能也可以),您可以将 [%t] 放在日志模式中,它会在日志消息中显示线程名称,对我个人而言,了解所有这些异步的流程真的很有帮助块

还有一点,通过这种设置,所有三个请求实际上将同时发送,因此请确保 http 服务器能够同时处理多个请求。

【讨论】:

  • 你可以做的是创建一个承诺,在你的 http 处理程序中完成/失败它,在处理程序之外你只返回 promise.future() 对象。 能否请你提供一个简单的例子,我看不懂
  • 如果你愿意,你可以,但你实际上并不需要。您仍然可以在constructDevice() 方法中循环遍历所有 ip,并以与CompositeFuture.all() 相同的方法处理它们。如果您这样做,您的 constructDevice() 将完成并释放线程,在引擎盖下 vert.x 将等待所有期货完成,然后最终调用在 CompositeFuture.all() 中定义的处理程序,即使您理论上已经离开 @987654328 @ 方法。了解正在发生的事情的最简单方法就是到处扔一堆日志消息,然后查看它们的执行顺序。
  • 我已经更新了我的问题。我认为在我的任务中使用 future 是相当困难的
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-10-06
  • 2011-03-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-15
相关资源
最近更新 更多