【问题标题】:How to generate output files for each input in Apache Flink如何在 Apache Flink 中为每个输入生成输出文件
【发布时间】:2020-07-06 02:18:29
【问题描述】:

我正在使用 Flink 来处理我的流数据。

流来自其他一些中间件,例如 Kafka、Pravega 等。

说 Pravega 正在发送一些字流,hello world my name is...

我需要的是三个步骤:

  1. 将每个单词映射到我的自定义类对象MyJson
  2. 将对象MyJson 映射到字符串。
  3. 将字符串写入文件:将一个字符串写入一个文件。

例如,对于流hello world my name is,我应该得到五个文件。

这是我的代码:

// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
        FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(adapter)
                .build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
            .map(new MapFunction<String, MyJson>() {
                @Override
                public MyJson map(String s) throws Exception {
                    MyJson myJson = JSON.parseObject(s, MyJson.class);
                    return myJson;
                }
            });
// map MyJson to String
DataStream<String> valueInJson = jsonStream
            .map(new MapFunction<MyJson, String>() {
                @Override
                public String map(MyJson myJson) throws Exception {
                    return myJson.toString();
                }
            });
// output
valueInJson.print();

此代码会将所有结果输出到 Flink 日志文件。

我的问题是如何将一个单词写入一个输出文件?

【问题讨论】:

  • 当一个词被重复时你想发生什么?你看过writeAsText和/或StreamingFileSink吗?
  • @DavidAnderson 我不在乎重复的单词。一个字,一个文件。就是这样。可以根据事件时间生成文件名,确保每个文件名都是唯一的。
  • 仅供参考,我更新了我的答案,提到在恢复期间需要注意不要产生重复的结果——如果你在乎的话。

标签: streaming apache-flink flink-streaming


【解决方案1】:

我认为最简单的方法是使用自定义接收器。

stream.addSink(new WordFileSink)
public static class WordFileSink implements SinkFunction<String> {

    @Override
    public void invoke(String value, Context context) {
        // generate a unique name for the new file and open it
        // write the word to the file
        // close the file
    }
}

请注意,此实现不一定提供仅一次的行为。您可能需要注意文件命名方案是唯一的和确定性的(而不是取决于处理时间),并为文件可能已经存在的情况做好准备。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-01-13
    • 2019-01-14
    • 2021-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多