【问题标题】:Controlling the data partition in Apache Spark控制 Apache Spark 中的数据分区
【发布时间】:2015-09-25 05:05:26
【问题描述】:

数据看起来像:

col 1 col 2 col 3 col 4
第 1 行 第 1 行 第 1 行 第 1 行
第 2 行 第 2 行 第 2 行 第 2 行
第 3 行 第 3 行 第 3 行 第 3 行
第 4 行 第 4 行 第 4 行 第 4 行
第 5 行 第 5 行 第 5 行 第 5 行
第6行 第6行 第6行 第6行

问题:我想对这些数据进行分区,假设第 1 行和第 2 行将作为一个分区处理,第 3 行和第 4 行作为另一个分区,第 5 行和第 6 行作为另一个分区并创建一个 JSON 数据将它们合并在一起列(具有行中数据值的列标题)。

输出应该是这样的:
[
{col1:row1,col2:row1:col3:row1:col4:row1},
{col1:row2,col2:row2:col3:row2:col4:row2},
{col1:row3,col2:row3:col3:row3:col4:row3},
{col1:row4,col2:row4:col3:row4:col4:row4},...
]

我尝试使用 spark 中可用的 repartion(num),但它并不是我想要的分区。因此生成的 JSON 数据无效。我对为什么我的程序花费相同的时间来处理数据有疑问,即使我使用了不同数量的内核,可以找到 here 并且重新分区建议是由 @Patrick McGloin 提出的。该问题中提到的代码是我正在尝试做的事情。

【问题讨论】:

  • 我认为您的意思不是 partition,因为您可以从单个 RDD 生成 JSON,而无需特别关注分区。如果您知道您的键需要应用到的数组位置,您可以在 RDD map 调用中使用 scala 的 zip
  • 我使用 zip 功能将标题与行结合起来创建 json 数据,但传入的数据是微批量的,大约有 10000 行。它需要在不到 10 分钟的时间内进行处理,因此我需要使用更多分区来使用更多内核。如果我使用单个 rdd 而不对其进行分区,则任务大约需要 45 分钟,这是不可接受的,因为传入数据每 10 分钟出现一次。
  • 如果您只需要大小均匀的分区,那么repartition 应该可以满足您的需求。确保您的集群中有足够的工人/执行者来利用您的核心,否则分区将按顺序运行。另一方面,如果您需要一起处理特定的行子集,那么我同意下面的@Lukasz Tracewski,但您可以使用groupByKey,这会更简单。

标签: apache-spark apache-spark-sql spark-streaming


【解决方案1】:

猜你需要的是partitionBy。在 Scala 中,您可以为其提供自定义构建 HashParitioner,而在 Python 中,您可以传递 partitionFunc。 Scala 中有很多示例,所以让我简要解释一下 Python 的风格。

partitionFunc 需要一个元组,第一个元素是键。假设您按以下方式组织数据: (ROW_ID, (A,B,C,..)) 其中 ROW_ID = [1,2,3,...,k]。您可以随时添加 ROW_ID 并在之后将其删除。

每两行获取一个新分区:

rdd.partitionBy(numPartitions = int(rdd.count() / 2),
                partitionFunc = lambda key: int(key / 2)

partitionFunc 将产生一个序列 0,0,1,1,2,2,... 这个数字将是给定行所属的分区数。

【讨论】:

  • groupByKey 解决方案也可以。
猜你喜欢
  • 2016-02-05
  • 1970-01-01
  • 2016-08-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多