【发布时间】:2017-11-24 15:29:30
【问题描述】:
在 Apache Flink 中获取 Exception in thread "main" java.lang.IndexOutOfBoundsException。我试图按照here 的示例进行操作,但是我得到了提及豁免。我在下面有一个我的主要课程的 sn-p。对于我做错了什么或如何解决错误,我将不胜感激。
DataStream<CSBEvent> inputEventStream = env.addSource(
new FlinkKafkaConsumer09<CSBEvent>("tester03", new EventDeserializationSchema(), properties));
Pattern<CSBEvent, ?> warningPattern = Pattern.<CSBEvent>begin("first")
.subtype(CSBEvent.class)
.where(evt -> evt.getEvent().getStatus().equals("FAILED"))
.next("second")
.subtype(CSBEvent.class)
.where(evt -> evt.getEvent().getStatus().equals("FAILED"))
.within(Time.seconds(10));
// Create a pattern stream from our warning pattern
PatternStream<CSBEvent> tempPatternStream = CEP.pattern(inputEventStream.keyBy("userID"), warningPattern); // .pattern(
// Generate temperature warnings for each matched warning pattern
DataStream<CSBEvent> warnings = tempPatternStream.select(
(Map<String, CSBEvent> pattern) -> {
CSBEvent first = (CSBEvent) pattern.get("first");
CSBEvent second = (CSBEvent) pattern.get("second");
return new CSBEvent(second.getUserID());
}
);
// Alert pattern: Two consecutive request failure warnings appearing within a time interval of 20 seconds
Pattern<CSBEvent, ?> alertPattern = Pattern.<CSBEvent>begin("first")
.next("second")
.within(Time.seconds(20));
// Create a pattern stream from our alert pattern
PatternStream<CSBEvent> alertPatternStream = CEP.pattern(
warnings.keyBy("userID"),
alertPattern);
....这里是堆栈跟踪:
Exception in thread "main" java.lang.IndexOutOfBoundsException
at org.apache.flink.api.java.typeutils.PojoTypeInfo.getTypeAt(PojoTypeInfo.java:259)
at org.apache.flink.streaming.util.keys.KeySelectorUtil.getSelectorForKeys(KeySelectorUtil.java:62)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:276)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:272)
at de.cep.CorrelatorVersion101.main(CorrelatorVersion101.java:61)
【问题讨论】:
标签: java stream apache-flink