【问题标题】:Flink apply function on timeWindowFlink 在 timeWindow 上应用函数
【发布时间】:2017-12-18 08:28:01
【问题描述】:

我目前正在做一个 Flink 项目。该项目的主要思想是读取 JSON(网络日志)的数据流,将它们关联起来,并生成一个新的 JSON,它是不同 JSON 信息的组合。

此时,我可以读取 JSON,生成 KeyedStream(基于生成日志的机器),然后生成 5 秒的窗口流。

我要执行的下一步是对窗口使用apply函数并组合每个JSON的信息。我有点不知道该怎么做。

我目前拥有的代码如下:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {


        }

.apply(new generateMetaAlert()) 部分抱怨下一个错误:

WindowedStream,Tuple,TimeWindow> 类型中的方法 apply(WindowFunction,R,Tuple,TimeWindow>) 不适用于参数 (MetaAlertGenerator.generateMetaAlert)

还有其他与我提出的不同的代码结构建议吗?

提前感谢您的帮助

【问题讨论】:

  • 我认为这是 this question 的副本。如果答案解决了您的问题,请检查并关闭。

标签: java json apache-flink flink-streaming


【解决方案1】:

当您应用keyBy 函数(不使用匿名类)时,自定义WindowFunction(第三个字段)中的密钥类型应为Tuple,因为编译器无法确定您的密钥类型。这段代码编译没有错误(考虑到我试图用虚拟代码填充空白):

public class Test {

    public Test() {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> events = env.readTextFile("datastream.log");

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());

    }

    public class JSONObject {
    }

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

但最直接的方法是使用匿名类,这样您就可以保留String 类型:

DataStream<Tuple2<String, JSONObject>> MetaAlert
        = events
        .flatMap(new JSONParser())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                // Your code here
            }
        });

最后,如果你想保留类,但又想保持你的key的类型不变,你可以实现一个KeySelector

public class Test {

    public Test() {

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                    @Override
                    public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                        return json.f0;
                    }
                })
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

【讨论】:

    猜你喜欢
    • 2020-08-31
    • 1970-01-01
    • 2016-08-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多