【发布时间】:2016-08-16 14:32:24
【问题描述】:
我正在接收时间序列数据,并且希望仅保留/更新我们根据特定键获得的最新条目。我们曾经使用mapWithState 聚合事物,我们能够在我们的 qa 环境中以大约 6k/秒的速度在本地处理大约 1k/秒,在我们最强大的环境中大约为 45k/秒。
我删除了很多代码,因为我们有一些需求更改,我认为我看到这种缓慢行为的原因是因为 reduceByKey 这是我拥有的少量代码:
rawStream
.map(timeSeries => (timeSeries.key, AggregationEngine.createSingleItemAggregate(timeSeries)))
// Select the last value for the key
.reduceByKey((accumulator: AggregationResult, aggregationResult: AggregationResult) => {
if (accumulator.firstTimeMillis > aggregationResult.firstTimeMillis) {
accumulator
} else {
aggregationResult
}
}, numPartitions = numberOfReduceTasks)
// Send back table name and aggregate
.map(aggResultPair => (aggResultPair._2.tableName, aggResultPair._2) )
在本地处理 500 个数据点大约需要 3-5 分钟,而且在我们的 qa 环境中处理小批量时速度非常慢。我知道应该有一个慢下来,因为之前一切都是一个阶段,现在因为洗牌,它分成了两个阶段,洗牌需要很长时间。我应该使用理想的numPartitions 值吗?就像每个核心应该添加 X 个分区,或者每个内存应该添加 X 个更多分区。我一直在本地运行它,并试图弄清楚它,但没有什么能真正让我得到合理的处理时间。
【问题讨论】:
-
一般建议,分区数 > 核心数。试试,分区数 = 2* 个核心。尝试在 spark UI 中查看数据是否均匀分布。
-
好吧,虽然你认为看到这样的性能下降很正常吗?
-
重新分区会导致随机播放,这反过来通常会导致数据通过网络传输,从而在很大程度上影响性能。您可以阅读这篇 (blog.cloudera.com/blog/2015/03/…) 文章并根据您的 Spark 配置和集群规范来决定。
标签: scala apache-spark mapreduce spark-streaming