【问题标题】:Apache Flink CEP pattern of multiple event types多种事件类型的 Apache Flink CEP 模式
【发布时间】:2019-06-10 21:13:38
【问题描述】:

目前我正在做一个学期项目,我必须识别三个事件的系列。赞P -> R -> P

我们有两种不同的事件类型,它们通过同一主题中的 Kafka 连接器使用。

我创建了一个名为 Event 的父类,其他两种类型都从该类派生。

Kafka 连接器将带有 EventSchema 的 JSON 反序列化到父类 Event。

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

图案如下所示:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

目前的问题是,如果我发送三个格式合适的消息,模式将无法被识别。

我尝试过的其他方法.. 我改变了这样的模式:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

此模式有效,但后来我无法将 Event 类转换为位置或识别..

我错过了什么?

【问题讨论】:

  • 也许你传递给模式的元素是事件?
  • 是的,但是不能有不同类型的事件,从事件时间升序排列,找到里面的模式吗?如果所有事件都来自一个主题,或者每个事件都有自己的主题,则不应该提出观点..
  • 从kafka反序列化的时候有没有用子类型初始化对象?
  • 我只是用val kafkaSource = new FlinkKafkaConsumer("sp", new EventSchema, properties)将它序列化为事件,因为运行时多种类型都在一个主题中。但是我可以将多个不同类型的kafkaSources合并为一个吗?
  • 可以把EventSchema的代码放在这里吗?我按照你的描述试过了,还是可以的。

标签: apache-kafka apache-flink flink-cep complex-event-processing


【解决方案1】:

根据 cmets,我认为您应该返回子类型实例而不是事件。这是我给你的示例代码:

val event = mapper.readValue(bytes, classOf[Event])
event.getType match {
  case "position" => mapper.readValue(bytes, classOf[Position])
  case "recognition" => mapper.readValue(bytes, classOf[Recognized])
  case _ =>
}

我成功地从 CEPITCase.java 中的一个测试用例中尝试了这个示例。

DataStream<Event> input = env.fromElements(
  new Event(1, "foo", 4.0),
  new SubEvent(2, "foo", 4.0, 1.0),
  new SubEvent(3, "foo", 4.0, 1.0),
  new SubEvent(4, "foo", 4.0, 1.0),
  new Event(5, "middle", 5.0)
);

Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
.followedByAny("middle").subtype(SubEvent.class)
.followedByAny("end").subtype(SubEvent.class);

【讨论】:

  • 你是个天才!只有一件事你的例子不起作用..当我使用asInstanceOf时,会发生一个异常,我无法将事件转换为位置..因此,我与mapper.readValue(bytes, classOf[Position])交换了这个并重新序列化它。如果您可以在帖子中编辑此内容,我将接受作为答案!再次感谢您!
  • @DanielEisenreich 这是你需要的改变吗?
猜你喜欢
  • 1970-01-01
  • 2022-08-19
  • 2016-12-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多