【发布时间】:2020-07-06 02:18:29
【问题描述】:
我正在使用 Flink 来处理我的流数据。
流来自其他一些中间件,例如 Kafka、Pravega 等。
说 Pravega 正在发送一些字流,hello world my name is...。
我需要的是三个步骤:
- 将每个单词映射到我的自定义类对象
MyJson。 - 将对象
MyJson映射到字符串。 - 将字符串写入文件:将一个字符串写入一个文件。
例如,对于流hello world my name is,我应该得到五个文件。
这是我的代码:
// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
.map(new MapFunction<String, MyJson>() {
@Override
public MyJson map(String s) throws Exception {
MyJson myJson = JSON.parseObject(s, MyJson.class);
return myJson;
}
});
// map MyJson to String
DataStream<String> valueInJson = jsonStream
.map(new MapFunction<MyJson, String>() {
@Override
public String map(MyJson myJson) throws Exception {
return myJson.toString();
}
});
// output
valueInJson.print();
此代码会将所有结果输出到 Flink 日志文件。
我的问题是如何将一个单词写入一个输出文件?
【问题讨论】:
-
当一个词被重复时你想发生什么?你看过
writeAsText和/或StreamingFileSink吗? -
@DavidAnderson 我不在乎重复的单词。一个字,一个文件。就是这样。可以根据事件时间生成文件名,确保每个文件名都是唯一的。
-
仅供参考,我更新了我的答案,提到在恢复期间需要注意不要产生重复的结果——如果你在乎的话。
标签: streaming apache-flink flink-streaming