【发布时间】:2022-01-13 11:57:43
【问题描述】:
我正在使用 Apache Flink 从 kafka 主题中读取数据并将其存储在服务器上的文件中。我正在使用 FileSink 存储文件,它会根据日期和时间创建目录结构,但不会创建日志文件。
当我运行程序时,它会创建如下目录结构,但日志文件没有存储在这里。
/flink/testlogs/2021-12-08--07
/flink/testlogs/2021-12-08--06
我希望每 15 分钟将日志文件写入一个新的日志文件。 下面是代码。
DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());
final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
newStream.sinkTo(sink);
env.execute("DataReader");
LogParser returns Tuple6.
【问题讨论】:
-
检查点是否启用? FileSink 仅在检查点完成时才最终确定它写入的文件。
-
@DavidAnderson 检查点未启用。您能建议如何启用它吗?
标签: apache-kafka apache-flink flink-streaming