【问题标题】:Apache Flink using Java - Performance Issue使用 Java 的 Apache Flink - 性能问题
【发布时间】:2021-10-12 10:08:26
【问题描述】:

我们有一个使用 Java 编写并在 AWS Kinesis Data Analytics 上运行的 flink 应用程序。应用程序从 AWS Managed Service Kafka(kafka 主题 1)读取输入流,然后应用业务逻辑(一些计算),最后将输出写入另一个 Kafka 主题(kafka 主题 2)。

并行度为 10,主题有 15 个分区。 期望在 5 分钟内处理约 20K 并发数据。但经过所有优化后,我们可以在 25 分钟内将其提高到约 20K 并发数据的速度。

能否请您告诉我是否可以实施任何其他性能优化来实现目标。

Flink Async I/O 是否会成为进一步优化的选项?

示例代码:-

StreamExecutionEnvironment streamenv =
    StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<ObjectNode> initialStreamData = streamenv
    .addSource(new FlinkKafkaConsumer<>(
        TOPIC_NAME, 
        new ObjectNodeJsonDeSerializerSchema(),
        kafkaConnectProperties);

initialStreamData.print();

DataStream<POJO> rawDataProcess = initialStreamData
    .rebalance()
    .flatMap(new ReProcessingDataProvider())
    .keyBy(value -> value.getPersonId());

rawDataProcess.print();

DataStream<POJO> cgmStream = rawDataProcess
    .keyBy(new ReProcessorKeySelector())
    .rebalance()
    .flatMap(new SgStreamTask());

cgmStream.print();

DataStream<POJO> artfctOverlapStream = null;
artfctOverlapStream = cgmStreamData
    .keyBy(new CGMKeySelector())
    .countWindow(2, 1)
    .apply(new ArtifactOverlapProvider()); //the same person_id key

cgmStreamData.print();

DataStream<POJO> streamWithSgRoc = null;

streamWithSgRoc = artfctOverlapStream
    .keyBy(new CGMKeySelector())
    .countWindow(7, 1)
    .apply(new SgRocProvider()); // the same person_id key 

streamWithSgRoc.print();

DataStream<POJO> cgmExcursionStream = null;

cgmExcursionStream = streamWithSgRoc
    .keyBy(new CGMKeySelector())
    .countWindow(Common.THREE, Common.ONE)
    .apply(new CGMExcursionProviderStream()); //the same person_id key

cgmExcursionStream.print();

cgmExcursionStream
    .addSink(new FlinkKafkaProducer<CGMDataCollector>(
        topicsProperties.getProperty(Common.CGM_EVENT_TOPIC),
        new CGMDataCollectorSchema(),
        kafkaConnectProperties));

【问题讨论】:

  • 我对 Flink 配置选项不是很熟悉,但是“并行度为 10”是否意味着您在一个应用程序实例上有 10 个线程,有 10 个单独的运行实例,或者介于两者之间?
  • 我相信实例只有一个,但会有 10 个任务槽分配给 Flink 作业
  • 调度程序中有一个“任务槽”吗?如果是这样,那听起来像是单独的实例。无论如何,由于您只在两个 Kafka 主题之间进行,您是否将运行时与 Kafka Streams 进行了比较?可能限制不是 Flink
  • 是的,我已经检查了 Kafka 流的运行时间,我们还有其他非 flink 应用程序的 kafka 流应用程序,它们的吞吐量比 flink 应用程序更好

标签: java apache-kafka apache-flink flink-streaming


【解决方案1】:

在您分享的内容中,我没有看到任何可以解释吞吐量不佳的内容。但既然你问过异步 i/o,我想知道平面图是否在做一些外部 i/o。如果是这样,那就可以解释了。如果是这种情况,那么使用异步 i/o 应该会有很大帮助,前提是外部服务可以处理增加的负载。

我也想知道为什么并行度是 10,以及这 10 个插槽有哪些资源可用。是否有足够的核心来保持一切运转?对于 15 个分区,您有 5 个插槽,每个插槽处理两个分区,另外 5 个插槽每个处理一个分区。 5、8 和 15 是更明显的并行选择。 (当然,如果每个 slot 也命中了 flatmap 中的外部服务,那也需要考虑在内。)

看到代码后更新:

您可以做一些简单的事情来加快速度。

要做的一件事是给集群更多的资源。您可以保持并行度不变,但通过将任务管理器放置在具有更多内核的机器上,为每个插槽提供更多可使用的内核。

但在此之前,请先了解一下优化管道。 rebalance 和 keyBy 都相当昂贵,而且您使用它们的次数超出了必要的范围。一个 keyBy 紧跟一个再平衡是没有意义的,两个 keyBy 一个接一个也没有意义。

Rebalance 对流进行往返重新分区。它通常在更改并行性或需要克服数据倾斜时完成。几乎从不使用重新平衡,除非在更改并行度时根据需要隐式使用。

KeyBy 执行基于键的重新分区。如果一个 keyBy 跟随另一个,第二个会撤消第一个所做的。

keyBy 和 rebalance 都需要序列化和反序列化每个事件,并通过网络堆栈发送它们。这是您仅在绝对必要时才想做的事情。

修复这些重新平衡/keyBy 问题将减少集群上的工作负载。如果这还不足以实现所需的吞吐量,那么为每个插槽提供更多内核(以便管道的各个阶段可以并行运行)应该可以解决问题。

【讨论】:

  • 不,平面图不进行任何外部调用,所以我相信异步 i/o 在这里没有帮助。我们有一些复杂的业务逻辑(计算)在 RichFlatMapFunction() 的 apply() 方法中执行。我试图将并行度设为 15,但由于某种原因,如果我的并行度超过 10,AWS Kinesis 分析会引发连接异常,已经向他们提出了问题单。您能否建议一个明显的分区、并行性和内核,以在 5 分钟内实现 20K 的吞吐量,其中 1 条消息的大小为 30KB。目前核心分配为 1
  • 是每个任务管理器 1 个核心,还是整个集群一个核心?你有多少个任务管理器?
  • 是的,每个任务管理器 1 个核心。我现在已将并行度更改为 8,具有 15 个主题分区。再次测试,吞吐量仅增加了(3 分钟),即 22 分钟内增加了 20K(与之前的 25 分钟相比)。并行度 - 8,任务管理器 - 8,任务槽 - 8,CPU 核心 - 每个任务 1 个,JVM 堆 - 2.70 GB,Flink 内存 - 1.88 GB。谢谢!
  • (1) 将并行度增加到 15 应该会使吞吐量大约翻倍/时间减半,因此弄清楚为什么这不起作用是有意义的。 (2) 管道是否在任何地方都有 keyBy,或者所有操作是否链接在一起成为一个任务(换句话说,作业图是什么样的)? (3) 启用对象重用应该会有所帮助。 (4) RichFlatMapFunction 没有 apply() 方法。 (5) 建议你使用 profiler 来查看时间的去向。
  • 为延迟回复道歉。 (1) 对于 Kinesis Analytics 15 并行度的问题,我仍然没有得到解决方案。 (2) 是的,管道有一个keyBy 和countWindow 操作符。我可以看到流分布在所有运算符的所有 8 个插槽中。 (3) 我害怕启用对象重用,因为它们在方法之间被引用。这是我的完整示例代码,如果我在这里犯了任何错误,请告诉我。
猜你喜欢
  • 2016-02-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-29
  • 1970-01-01
  • 2012-01-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多