当您应用keyBy 函数(不使用匿名类)时,自定义WindowFunction(第三个字段)中的密钥类型应为Tuple,因为编译器无法确定您的密钥类型。这段代码编译没有错误(考虑到我试图用虚拟代码填充空白):
public class Test {
public Test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> events = env.readTextFile("datastream.log");
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class JSONObject {
}
public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
但最直接的方法是使用匿名类,这样您就可以保留String 类型:
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
// Your code here
}
});
最后,如果你想保留类,但又想保持你的key的类型不变,你可以实现一个KeySelector:
public class Test {
public Test() {
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
@Override
public String getKey(Tuple2<String, JSONObject> json) throws Exception {
return json.f0;
}
})
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}