【问题标题】:FileSink in Apache Flink not generating logs in output folderApache Flink 中的 FileSink 不在输出文件夹中生成日志
【发布时间】:2022-01-13 11:57:43
【问题描述】:

我正在使用 Apache Flink 从 kafka 主题中读取数据并将其存储在服务器上的文件中。我正在使用 FileSink 存储文件,它会根据日期和时间创建目录结构,但不会创建日志文件。

当我运行程序时,它会创建如下目录结构,但日志文件没有存储在这里。

/flink/testlogs/2021-12-08--07 
/flink/testlogs/2021-12-08--06

我希望每 15 分钟将日志文件写入一个新的日志文件。 下面是代码。

DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p)); 

OutputFileConfig config = OutputFileConfig
                 .builder()
                 .withPartPrefix("prefix")
                 .withPartSuffix(".ext")
                 .build();

DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());

final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
                  new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                .withOutputFileConfig(config)
                .build();
    
        newStream.sinkTo(sink);

env.execute("DataReader");  

LogParser returns Tuple6. 

【问题讨论】:

  • 检查点是否启用? FileSink 仅在检查点完成时才最终确定它写入的文件。
  • @DavidAnderson 检查点未启用。您能建议如何启用它吗?

标签: apache-kafka apache-flink flink-streaming


【解决方案1】:

在流模式下使用时,Flink 的FileSink 要求启用检查点。为此,您需要指定检查点的存储位置,以及您希望它们发生的时间间隔。

要在flink-conf.yaml 中进行配置,您可以执行以下操作:

state.checkpoints.dir: s3://checkpoint-bucket
execution.checkpointing.interval: 10s

或者在您的应用程序代码中,您可以这样做:

env.getCheckpointConfig().setCheckpointStorage("s3://checkpoint-bucket");
env.enableCheckpointing(10000L);

docs 的另一个重要细节:

鉴于 Flink 接收器和 UDF 通常不会区分正常作业终止(例如有限输入流)和因故障而终止,因此在作业正常终止时,最后一个正在进行的文件不会转换到“完成”状态。

【讨论】:

  • 感谢它解决了现在正在创建文件的问题。但是我将滚动间隔设置为 15 分钟,但文件每分钟创建一次。 withRollingPolicy(DefaultRollingPolicy.builder() .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
猜你喜欢
  • 1970-01-01
  • 2019-02-14
  • 2018-07-12
  • 2016-05-12
  • 2022-09-27
  • 2018-02-22
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多