【发布时间】:2021-10-12 10:08:26
【问题描述】:
我们有一个使用 Java 编写并在 AWS Kinesis Data Analytics 上运行的 flink 应用程序。应用程序从 AWS Managed Service Kafka(kafka 主题 1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写入另一个 Kafka 主题(kafka 主题 2)。
并行度为 10,主题有 15 个分区。 期望在 5 分钟内处理约 20K 并发数据。但经过所有优化后,我们可以在 25 分钟内将其提高到约 20K 并发数据的速度。
能否请您告诉我是否可以实施任何其他性能优化来实现目标。
Flink Async I/O 是否会成为进一步优化的选项?
示例代码:-
StreamExecutionEnvironment streamenv =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> initialStreamData = streamenv
.addSource(new FlinkKafkaConsumer<>(
TOPIC_NAME,
new ObjectNodeJsonDeSerializerSchema(),
kafkaConnectProperties);
initialStreamData.print();
DataStream<POJO> rawDataProcess = initialStreamData
.rebalance()
.flatMap(new ReProcessingDataProvider())
.keyBy(value -> value.getPersonId());
rawDataProcess.print();
DataStream<POJO> cgmStream = rawDataProcess
.keyBy(new ReProcessorKeySelector())
.rebalance()
.flatMap(new SgStreamTask());
cgmStream.print();
DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData
.keyBy(new CGMKeySelector())
.countWindow(2, 1)
.apply(new ArtifactOverlapProvider()); //the same person_id key
cgmStreamData.print();
DataStream<POJO> streamWithSgRoc = null;
streamWithSgRoc = artfctOverlapStream
.keyBy(new CGMKeySelector())
.countWindow(7, 1)
.apply(new SgRocProvider()); // the same person_id key
streamWithSgRoc.print();
DataStream<POJO> cgmExcursionStream = null;
cgmExcursionStream = streamWithSgRoc
.keyBy(new CGMKeySelector())
.countWindow(Common.THREE, Common.ONE)
.apply(new CGMExcursionProviderStream()); //the same person_id key
cgmExcursionStream.print();
cgmExcursionStream
.addSink(new FlinkKafkaProducer<CGMDataCollector>(
topicsProperties.getProperty(Common.CGM_EVENT_TOPIC),
new CGMDataCollectorSchema(),
kafkaConnectProperties));
【问题讨论】:
-
我对 Flink 配置选项不是很熟悉,但是“并行度为 10”是否意味着您在一个应用程序实例上有 10 个线程,有 10 个单独的运行实例,或者介于两者之间?
-
我相信实例只有一个,但会有 10 个任务槽分配给 Flink 作业
-
调度程序中有一个“任务槽”吗?如果是这样,那听起来像是单独的实例。无论如何,由于您只在两个 Kafka 主题之间进行,您是否将运行时与 Kafka Streams 进行了比较?可能限制不是 Flink
-
是的,我已经检查了 Kafka 流的运行时间,我们还有其他非 flink 应用程序的 kafka 流应用程序,它们的吞吐量比 flink 应用程序更好
标签: java apache-kafka apache-flink flink-streaming