【问题标题】:java.lang.IndexOutOfBoundsException in Apache Flink keyBy functionApache Flink keyBy 函数中的 java.lang.IndexOutOfBoundsException
【发布时间】:2017-11-24 15:29:30
【问题描述】:

在 Apache Flink 中获取 Exception in thread "main" java.lang.IndexOutOfBoundsException。我试图按照here 的示例进行操作,但是我得到了提及豁免。我在下面有一个我的主要课程的 sn-p。对于我做错了什么或如何解决错误,我将不胜感激。

DataStream<CSBEvent> inputEventStream = env.addSource(
                new FlinkKafkaConsumer09<CSBEvent>("tester03", new EventDeserializationSchema(), properties));

 Pattern<CSBEvent, ?> warningPattern = Pattern.<CSBEvent>begin("first")
                .subtype(CSBEvent.class)
                .where(evt -> evt.getEvent().getStatus().equals("FAILED"))
                .next("second")
                .subtype(CSBEvent.class)
                .where(evt -> evt.getEvent().getStatus().equals("FAILED"))
                .within(Time.seconds(10));

        // Create a pattern stream from our warning pattern
        PatternStream<CSBEvent> tempPatternStream = CEP.pattern(inputEventStream.keyBy("userID"), warningPattern); //   .pattern(

        // Generate temperature warnings for each matched warning pattern
        DataStream<CSBEvent> warnings = tempPatternStream.select(
            (Map<String, CSBEvent> pattern) -> {
                CSBEvent first = (CSBEvent) pattern.get("first");
                CSBEvent second = (CSBEvent) pattern.get("second");

                return new CSBEvent(second.getUserID());
            }
        );

        // Alert pattern: Two consecutive request failure warnings appearing within a time interval of 20 seconds
        Pattern<CSBEvent, ?> alertPattern = Pattern.<CSBEvent>begin("first")
                .next("second")
                .within(Time.seconds(20));

        // Create a pattern stream from our alert pattern
        PatternStream<CSBEvent> alertPatternStream = CEP.pattern(
                warnings.keyBy("userID"),
                alertPattern);

....这里是堆栈跟踪:

Exception in thread "main" java.lang.IndexOutOfBoundsException
at org.apache.flink.api.java.typeutils.PojoTypeInfo.getTypeAt(PojoTypeInfo.java:259)
at org.apache.flink.streaming.util.keys.KeySelectorUtil.getSelectorForKeys(KeySelectorUtil.java:62)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:276)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:272)
at de.cep.CorrelatorVersion101.main(CorrelatorVersion101.java:61)

【问题讨论】:

    标签: java stream apache-flink


    【解决方案1】:

    从您的代码看来,您使用的是 Flink 1.2 对吗?您是否考虑过更新到较新的版本?目前我们即将发布 Flink 1.4。

    另外,能否贴出整个堆栈跟踪,看看哪里抛出异常?

    【讨论】:

    • 我实际上在教程帖子中使用了 Flink 1.0,仍然是流处理领域的新手。但是,我将按照建议进行升级。我想更新 pom 文件中的版本就足够了?我的 Flink 安装版本是 1.3.2。我已经更新了完整的堆栈跟踪。
    • 如果你安装的是 Flink 1.3.2,并且你的代码是针对 Flink 1.0 构建的,那么你很可能会遇到问题。 Flink 1.3 和 1.1 之前的版本之间的向后兼容性已被破坏。
    猜你喜欢
    • 2020-12-24
    • 1970-01-01
    • 2022-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-19
    • 1970-01-01
    相关资源
    最近更新 更多