【发布时间】:2019-06-10 21:13:38
【问题描述】:
目前我正在做一个学期项目,我必须识别三个事件的系列。赞P -> R -> P
我们有两种不同的事件类型,它们通过同一主题中的 Kafka 连接器使用。
我创建了一个名为 Event 的父类,其他两种类型都从该类派生。
Kafka 连接器将带有 EventSchema 的 JSON 反序列化到父类 Event。
val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)
图案如下所示:
val pattern = Pattern
.begin[Event]("before")
.subtype(classOf[Position])
.next("recognized")
.subtype(classOf[Recognized])
.next("after")
.subtype(classOf[Position])
目前的问题是,如果我发送三个格式合适的消息,模式将无法被识别。
我尝试过的其他方法.. 我改变了这样的模式:
val pattern = Pattern
.begin[Event]("before")
.where(e => e.getType == "position")
.next("recognized")
.where(e => e.getType == "recognition")
.next("after")
.where(e => e.getType == "position")
此模式有效,但后来我无法将 Event 类转换为位置或识别..
我错过了什么?
【问题讨论】:
-
也许你传递给模式的元素是事件?
-
是的,但是不能有不同类型的事件,从事件时间升序排列,找到里面的模式吗?如果所有事件都来自一个主题,或者每个事件都有自己的主题,则不应该提出观点..
-
从kafka反序列化的时候有没有用子类型初始化对象?
-
我只是用
val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties)将它序列化为事件,因为运行时多种类型都在一个主题中。但是我可以将多个不同类型的kafkaSources合并为一个吗? -
可以把EventSchema的代码放在这里吗?我按照你的描述试过了,还是可以的。
标签: apache-kafka apache-flink flink-cep complex-event-processing