【问题标题】:numPartitions for reduceByKey not affecting speedreduceByKey 的 numPartitions 不影响速度
【发布时间】:2016-08-16 14:32:24
【问题描述】:

我正在接收时间序列数据,并且希望仅保留/更新我们根据特定键获得的最新条目。我们曾经使用mapWithState 聚合事物,我们能够在我们的 qa 环境中以大约 6k/秒的速度在本地处理大约 1k/秒,在我们最强大的环境中大约为 45k/秒。

我删除了很多代码,因为我们有一些需求更改,我认为我看到这种缓慢行为的原因是因为 reduceByKey 这是我拥有的少量代码:

rawStream
    .map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
    // Select the last value for the key
    .reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
        if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
            accumulator
        } else {
            aggregationResult
        }
    }, numPartitions = numberOfReduceTasks)
    // Send back table name and aggregate
    .map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )

在本地处理 500 个数据点大约需要 3-5 分钟,而且在我们的 qa 环境中处理小批量时速度非常慢。我知道应该有一个慢下来,因为之前一切都是一个阶段,现在因为洗牌,它分成了两个阶段,洗牌需要很长时间。我应该使用理想的numPartitions 值吗?就像每个核心应该添加 X 个分区,或者每个内存应该添加 X 个更多分区。我一直在本地运行它,并试图弄清楚它,但没有什么能真正让我得到合理的处理时间。

【问题讨论】:

  • 一般建议,分区数 > 核心数。试试,分区数 = 2* 个核心。尝试在 spark UI 中查看数据是否均匀分布。
  • 好吧,虽然你认为看到这样的性能下降很正常吗?
  • 重新分区会导致随机播放,这反过来通常会导致数据通过网络传输,从而在很大程度上影响性能。您可以阅读这篇 (blog.cloudera.com/blog/2015/03/…) 文章并根据您的 Spark 配置和集群规范来决定。

标签: scala apache-spark mapreduce spark-streaming


【解决方案1】:

我在一个 RDD 中有大约 2000 个项目的小型集群上使用 Spark 有过类似的体验。重新分区到许多不同的分区计数并没有什么不同。一旦我用更多的项目(大约 4000 个,但我认为这取决于你拥有的执行者的数量)运行它,它就开始按预期运行。尝试使用更多数据点运行它。

【讨论】:

  • 我想我有点注意到了。我会尝试并实际验证这一点,然后我会回来接受答案。谢谢。
  • 您能验证吗?你得到了什么结果?
  • 是的,在 4 个执行器上处理 19k 个事件需要 5s,处理 1.5k 个事件需要 3s。
猜你喜欢
  • 2020-09-04
  • 2013-01-07
  • 1970-01-01
  • 2021-06-30
  • 1970-01-01
  • 2014-08-03
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多