【问题标题】:Skewed By in Spark在 Spark 中偏斜
【发布时间】:2019-03-28 05:14:23
【问题描述】:

我有一个数据集,我想按特定键 (clientID) 进行分区,但有些客户端产生的数据远远多于其他客户端。 Hive 中有一个名为“ListBucketing”的功能,由“skewed by”调用,专门用于处理这种情况。

但是,我找不到任何迹象表明 Spark 支持此功能,或者如何(如果支持)使用它。

是否有与之等效的 Spark 功能?或者,Spark 是否有一些其他功能可以复制这种行为?

(作为奖励 - 以及我的实际用例的要求 - 您的建议方法是否适用于 Amazon Athena?)

【问题讨论】:

标签: apache-spark hive


【解决方案1】:

据我所知,Spark 中没有这种开箱即用的工具。在数据倾斜的情况下,很常见的是添加一个人工列来进一步对数据进行分桶。

假设您想按列“y”进行分区,但数据非常倾斜,就像这个玩具示例中一样(1 个分区有 5 行,其他分区只有 1 行):

val df = spark.range(8).withColumn("y", when('id < 5, 0).otherwise('id))
df.show()
+---+---+
| id|  y|
+---+---+
|  0|  0|
|  1|  0|
|  2|  0|
|  3|  0|
|  4|  0|
|  5|  5|
|  6|  6|
|  7|  7|
+-------+

现在让我们添加一个人工随机列并写入数据框。

val maxNbOfBuckets = 3
val part_df = df.withColumn("r", floor(rand() * nbOfBuckets))
part_df.show
+---+---+---+
| id|  y|  r|
+---+---+---+
|  0|  0|  2|
|  1|  0|  2|
|  2|  0|  0|
|  3|  0|  0|
|  4|  0|  1|
|  5|  5|  2|
|  6|  6|  2|
|  7|  7|  1|
+---+---+---+

// and writing. We divided the partition with 5 elements into 3 partitions.
part_df.write.partitionBy("y", "r").csv("...")

【讨论】:

  • 这将存储数据(这已经可行,Spark 将其作为一项功能)。但是,您可以将列值 lambda 替换为更复杂的东西; (val in LIST) ? val : "others" 这样就可以了。
  • 也应该在 Athena 中工作,尽管我需要使用 clientPartition 并将 clientID 作为常规列来伪造 skewed by 功能。
  • 令人印象深刻的解决方案@Oli。 rand() 的随机性如何?与仅一个节点(coalesce(1))执行相比,并行执行是否会增加冲突的可能性,即许多相同的值?
  • SparkSQL 函数旨在在分布式数据集上调用,根据文档,示例是 i.i.d.。因此,虽然我不知道实现的细节(所以我可能是错的),我会说即使没有coalesce(1),它也能正常工作。
  • 我的意思是issue,但对于像 Spark 这样的分布式系统。我们如何保证不同节点的均匀分布?我想我们目前不能,因为 Random 的不同实例不能保证这一点。内置功能可能会以某种方式解决它,但我猜性能仍然是这种功能的阻碍因素
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-01-17
  • 2014-02-20
  • 2011-02-03
  • 1970-01-01
  • 2019-04-04
相关资源
最近更新 更多