【发布时间】:2018-11-09 21:27:36
【问题描述】:
我们有一个运行 esper 的原型,但性能相当欠缺。我想这是我的错,而不是 esper 本身的问题,因此正在寻求帮助以找出我的性能问题所在。
我正在运行 esper 服务的一个实例,并且我已按如下方式分配内存限制:-Xmx6G -Xms1G(我尝试了这些值的各种组合)。它可以使用4个CPU核心。在这些测试时没有其他服务在运行,只有 esper、kafka、zookeeper。
我正在使用 Akka Streams 将事件流式传输到 Esper,该服务非常简单,它从 kafka 流式传输,将事件插入到 Esper Runtime,Esper 已经测试了 3 个 EPStatement 并且可以正常工作。有一个监听器,我将它添加到所有 3 个语句中,监听器将匹配的事件输出到 kafka。
我试图找出性能问题所在的一些事情:
- 删除一些 EPStatements
- 删除所有 EPStatements
- 移除监听器
- 删除 EPStatements 和监听器
- 删除 esper .sendEvent(...)(这会显着提高性能,因此这似乎是 esper 问题,而不是 akka 问题)
只有上面的第 4 项产生了任何显着的可观察到的性能优势。
下面是我们通过 esper 运行的示例查询。它已经过测试并且可以工作,我已经阅读了文档的性能调整部分,对我来说似乎没问题。我所有的查询都遵循类似的格式:
select * from EsperEvent#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A Z* B Z* C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045")
)
一些代码..
这是我的 akka 流:
kafkaConsumer
.via(parsing) // Parse the json event to a POJO for esper. Have tried without this step also, no performance impact
.via(esperFlow) // mapAsync call to sendEvent(...)
//Here I am using kafka to measure the flow throughput rate. This is where I establish my throughput rate, based on the rate messages are written to "esper_flow_through" topic.
.map(rec => new ProducerRecord[Array[Byte], String]("esper_flow_through", Serialization.write(rec)))
.runWith(sink)
esperFlow(默认并行度 = 4):
val esperFlow = Flow[EsperEvent]
.mapAsync(Parallelism)(event => Future {
engine.getEPRuntime.sendEvent(event)
event
})
听众:
override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean], statement: EPStatement, epServiceProvider: EPServiceProvider): Unit = Future {
logger.info(s"Received Listener updates: Query Name: ${statement.getName} ---- ${newEvents.map(_.getUnderlying)}, $oldEvents")
statement.getName match {
case "SERVICE_INSTALL" => serviceInstall.increment(newEvents.length)
case "ADMIN_GROUP" => adminGroup.increment(newEvents.length)
case "SMB_SHARE" => smbShare.increment(newEvents.length)
}
newEvents.map(_.getUnderlying.toString).toList
.foreach(queryMatch => {
val record: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String]("esper_output", queryMatch)
producer.send(record)
})
}
性能观察:
- 输入流的速率约为每秒 2.4k。
- 我们看到 esper 从一开始就跟不上。最高每秒约 600 个
- Esper 的吞吐量逐渐下降
- 最终 esper 吞吐量稳定在
这个比率似乎很低,所以我假设我在某些 esper 配置方面遗漏了一些东西?
我们的目标吞吐量是每秒约 10k。我们离这个目标还有很长的路要走,我们在 Spark 中有一个类似的 POC,它更接近这个目标。
更新:
在@user650839 cmets 之后,我能够将吞吐量提高到稳定的每秒 1k。这两个查询产生相同的吞吐量:
select * from EsperEvent(eventtype = 13 and win_EventID in ("4624", "4672", "4697", "7045"))#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A B C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045"))
create context NetworkLogonThenInstallationOfANewService
start EsperEvent(eventtype = 13 AND win_EventID = "4624" AND win_LogonType = "3")
end pattern [
b=EsperEvent(eventtype = 13 AND win_EventID = "4672") ->
c=EsperEvent(eventtype = 13 AND (win_EventID = "4697" OR win_EventID = "7045"))
where timer:within(5 minutes)
]
context NetworkLogonThenInstallationOfANewService select * from EsperEvent output when terminated
但是每秒 1k 对我们的需求来说仍然太慢了。
【问题讨论】:
标签: performance esper