【发布时间】:2020-06-22 11:00:50
【问题描述】:
我对 vert.x 平台非常陌生。我的项目中有一个标准和一个工人 verticle,它通过 eventBus 进行通信。 Worker Verticle 在循环和数据库访问中执行多个 REST API 调用。
我的问题是工人 Verticle 在某些运行中没有问题地完成任务,但有时它会抛出以下错误。
Exception in thread "vert.x-worker-thread-12" io.vertx.core.VertxException: Connection was closed
我正在使用 kotlin 协程来处理 constructDevice(vertx: Vertx) 函数,该函数执行大部分 REST API 调用和数据库访问。
谁能告诉我上述问题的原因是什么以及有什么方法可以改进constructDevice(vertx: Vertx)函数以有效地处理多个REST API调用和MongoDB访问。
// worker verticle to handle multiple REST API calls and MongoDB database access
class DeviceDiscoverVerticle : CoroutineVerticle() {
override suspend fun start() {
val consumer = vertx.eventBus().localConsumer<String>("listDevice")
consumer.handler { message ->
CoroutineScope(vertx.dispatcher()).launch {
constructDevice(vertx)
}
message.reply("discovered")
}
}
}
// standard verticle to communicate with worker verticle
class ListDeviceVerticle : CoroutineVerticle() {
override suspend fun start() {
val reply = awaitResult<Message<String>> { h ->
vertx.eventBus().request("listDevice", "deviceIPs", h)
}
println("Reply received: ${reply.body()}")
}
}
fun main() {
val vertx = Vertx.vertx()
val workOption = DeploymentOptions().setWorker(true)
vertx.deployVerticle(DeviceDiscoverVerticle(), workOption)
vertx.deployVerticle(ListDeviceVerticle())
}
suspend fun constructDevice(vertx: Vertx) {
val deviceRepository = listOf(
"10.22.0.106",
"10.22.0.120",
"10.22.0.115",
"10.22.0.112"
)
val webClient = WebClient.create(vertx)
val config = json { obj("db_name" to "mnSet", "connection_string" to "mongodb://localhost:27017") }
val mongoClient: MongoClient = MongoClient.create(vertx, config)
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true))
// loop through the IP list and calls REST endpoints
val deviceList = deviceRepository.map { deviceIP ->
val deviceIPconfig: DeviceIPconfig
val deviceType: DeviceType
val requestDeviceIP: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/ipconfig/")
val requestDeviceType: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/information/")
val responseDeviceIP = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceIP.send(handler)
}
deviceIPconfig = if (responseDeviceIP.statusCode() == 200) {
json.parse(DeviceIPconfig.serializer(), responseDeviceIP.bodyAsString())
} else {
println("request to device $deviceIP failed with ${responseDeviceIP.statusCode()}")
DeviceIPconfig()
}
val responseDeviceType = awaitResult<HttpResponse<Buffer>> { handler ->
requestDeviceType.send(handler)
}
if (responseDeviceType.statusCode() == 200) {
deviceType = json.parse(DeviceType.serializer(), responseDeviceType.bodyAsString())
val device = DeviceModel(deviceIPconfig, deviceType)
json {
obj(
"_id" to deviceIPconfig.localMac,
"device" to json.stringify(DeviceModel.serializer(), device)
)
}
} else {
println("request to device $deviceIP failed with ${responseDeviceType.statusCode()}")
jsonObjectOf()
}
}.filterNot { it.isEmpty }
// construct data to upload in mongoDB
val activeDeviceIDs = json {
obj("_id" to "activeDeviceIDs",
"activeDeviceIDs" to deviceList.map { it.get<String>("_id") })
}
val activeDevices = json {
obj("_id" to "activeDevices",
"activeDevices" to json { array(deviceList) }
)
}
// save the data in MongoDB
mongoClient.save("devices", activeDeviceIDs) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
mongoClient.save("devices", activeDevices) { res ->
if (res.succeeded()) {
println("saved successfully")
} else {
res.cause().printStackTrace()
}
}
}
更新问题:1
@Damian 我已根据您的输入更新了我的问题。为了便于理解,我已经简化了上面的问题,但是当我尝试使用 promise/future 来实现这些事情时,我在某些时候遇到了困难。
我的任务是从不同的 REST 端点获取数据并从中获取 kotlin 类,我希望并行处理。
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed");
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<List<Future<HttpResponse<Buffer>>>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/self/diag/devices/")
val deviceDevicesPromise: Promise<List<Future<HttpResponse<Buffer>>>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
// this will return Json array and each element of that array needs to be called again in a loop.
val result = asyncResult.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
deviceDevicesPromise.complete(result)
} else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(vertx: Vertx, webClient: WebClient, deviceIP: String): List<Future<HttpResponse<Buffer>>> {
val deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>> = constructDeviceDevices(deviceIP, webClient)
// I need to call other rest points similar to this and I need map the result to kotlin class.
// how do get HTTP response out of each future request in deviceDevicesFuture: Future<List<Future<HttpResponse<Buffer>>>>.
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = // list of IP strings
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val futureList = constructDevice(vertx, webClient, deviceIP)
CompositeFuture.all(futureList).onComplete { allFuturesResult ->
if (allFuturesResult.succeeded()) {
// how to handle individual future result here to construct data
} else {
println("failed")
}
}
}
}
}
更新问题:2
@Damian 正如你所说,我已经更新了我的代码。
fun constructDeviceDevice(deviceIP: String, device: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/$device")
val deviceDevicePromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDevices.send { asyncResult ->
if (asyncResult.succeeded())
deviceDevicePromise.complete(asyncResult.result())
else
deviceDevicePromise.fail("Http request failed")
}
return deviceDevicePromise.future()
}
fun constructDeviceDevices(deviceIP: String, webClient: WebClient): Future<HttpResponse<Buffer>> {
val requestDeviceDevices: HttpRequest<Buffer> = webClient.get(deviceIP, "emsfp/node/v1/flows/")
val deviceDevicesPromise: Promise<HttpResponse<Buffer>> = Promise.promise()
requestDeviceDevices.send { asyncResult ->
if (asyncResult.succeeded()) {
deviceDevicesPromise.complete(asyncResult.result())
}
else
deviceDevicesPromise.fail("Http request failed")
}
return deviceDevicesPromise.future()
}
fun constructDevice(webClient: WebClient, deviceIP: String): Future<DeviceFlow> {
val json = Json(JsonConfiguration.Stable.copy(ignoreUnknownKeys = true, isLenient = true))
val constructDevicePromise: Promise<DeviceFlow> = Promise.promise()
val httpDevicesFuture: Future<HttpResponse<Buffer>> = constructDeviceDevices(deviceIP, webClient)
httpDevicesFuture.onComplete { ar ->
if(ar.succeeded()) {
val futureList = ar.result().bodyAsJsonArray().map { device ->
constructDeviceDevice(deviceIP, device.toString(), webClient)
}
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach { res ->
//not all future in futureList are completed here some of them shows Future{unresolved}
}
constructDevicePromise.complete(DeviceFlow(label = "xyz"))
}
else {
constructDevicePromise.fail("failed")
}
}
}
}
return constructDevicePromise.future()
}
class DeviceDiscoverVerticle : AbstractVerticle() {
override fun start() {
val deviceRepository = //list of IPs
val webClient = WebClient.create(vertx)
vertx.eventBus().localConsumer<String>("listDevice").handler { message ->
deviceRepository.forEach { deviceIP ->
val constructDeviceFuture = constructDevice(webClient, deviceIP)
constructDeviceFuture.onComplete {ar ->
//println(ar.result().toString())
}
}
}
}
}
我的问题在里面
CompositeFuture.all(futureList).onComplete { asyncResult ->
if (asyncResult.succeeded()) {
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
这里大部分future都没有解决,执行在这里挂起。
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@67d2e79}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@8bad0c6}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@c854509}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
所以我根据 vert.x 文档将 CompositeFuture.all(futureList).onComplete 更改为 CompositeFuture.join(futureList).onComplete,加入将等待所有未来完成
连接组合等待直到所有期货都完成,要么 成功或失败。 CompositeFuture.join 需要几个 期货参数(最多 6 个)并返回成功的未来 当所有期货都成功和失败当所有期货 已完成,其中至少有一个失败
但现在很少有期货会失败。这是更改为CompositeFuture.join后未来列表的输出
CompositeFuture.join(futureList).onComplete { asyncResult ->
println(futureList)
if (asyncResult.succeeded()) { res ->
// println(res) this one gets hanged and not printing all response
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5e9d3832}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@379c326a}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@51a39962}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@edcd528}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@293c3e5c}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5f86d3ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@12a329f7}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@7abedb1e}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@3238d4cb}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5bc868d3}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@50af1ecc}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@5cc549ec}, Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@282f4033}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@41a890b3}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
[Future{result=io.vertx.ext.web.client.impl.HttpResponseImpl@147d772a}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{cause=Http request failed}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}, Future{unresolved}]
很少有期货因为我的设备无法处理并发请求而失败?还有为什么程序执行会卡在里面
asyncResult.result().list<HttpResponse<Buffer>>().forEach {
如果设备并发请求处理存在问题,那么解决此问题的其他方法是什么。是否可以在 vertx 环境之外运行整个 rest 调用并通过事件总线与其通信?
此外,如果我将DeviceDiscoverVerticle 部署为标准verticle 而不是worker verticle,则应用程序将完全卡在CompositeFuture.all(futureList).onComplete。
【问题讨论】:
-
为什么要将它们部署为 worker=true?它旨在用于阻塞代码,vert.x 在异步操作中大放异彩(mongo 客户端和通过 vertx 的 WebClient 进行的 http 调用是异步操作的示例),阅读有关事件循环概念和反应性的更多信息
-
如果我没有将它部署为工作者,那么它总是会抛出事件循环线程错误
标签: kotlin vert.x vertx-verticle vert.x-webclient