【问题标题】:split my dataframe depending on the number of nodes pyspark根据节点pyspark的数量拆分我的数据框
【发布时间】:2019-07-25 10:13:50
【问题描述】:

我正在尝试根据(我的集群的)节点数拆分我的数据帧,

我的数据框看起来像:

如果我有 node=2 和 dataframe.count=7 :

因此,应用迭代方法,拆分的结果将是:

我的问题是:我该怎么做?

【问题讨论】:

  • 您的预期结果是什么?集群上的两个对象(即两个数据框)还是一个数据均匀分布的对象?
  • 我的预期结果是根据节点数拆分我的数据帧块(即,如果我的数据帧中有 10 行和 3 个节点,第一个节点将占用 3 行,第二个需要 3 lines ,3d 需要 3 行,最后一行将在一个节点中(迭代方法)
  • 但是你想要多少个对象?四个或三个对象还是只有一个?
  • 如果你的意思是一个对象是一个数据框,我想要一个数据框(大小可变)

标签: python-3.x pyspark nodes cluster-computing


【解决方案1】:

您可以使用 rdd 分区函数之一来做到这一点(请看下面的代码),但我不推荐它作为
只要您不完全了解自己在做什么以及您这样做的原因。一般来说(或者对于大多数用例来说更好)最好让 spark 处理数据分布。

import pyspark.sql.functions as F
import itertools
import math

#creating a random dataframe
l = [(x,x+2) for x in range(1009)]

columns = ['one', 'two']

df=spark.createDataFrame(l, columns)

#create on partition to asign a partition key
df = df.coalesce(1)

#number of nodes (==partitions)
pCount = 5

#creating a list of partition keys
#basically it repeats range(5) several times until we have enough keys for each row
partitionKey = list(itertools.chain.from_iterable(itertools.repeat(x, math.ceil(df.count()/pCount)) for x in range(pCount)))

#now we can distribute the data to the partitions
df = df.rdd.partitionBy(pCount, partitionFunc = lambda x: partitionKey.pop()).toDF()

#This shows us the number of records within each partition
df.withColumn("partition_id", F.spark_partition_id()).groupBy("partition_id").count().show()

输出:

+------------+-----+ 
|partition_id|count| 
+------------+-----+ 
|           1|  202| 
|           3|  202| 
|           4|  202| 
|           2|  202| 
|           0|  201| 
+------------+-----+

【讨论】:

  • 谢谢你,但它在转换为 DF 的部分出现问题。[Stage 10:> (0 + 1) / 1]19/07/26 16:38:08 ERROR Executor: Exception在阶段 10.0 (TID 10) 中的任务 0.0 文件“C:\Users\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\rdd.py”,第 1771 行,在 add_shuffle_key for k 中, v in iterator: ValueError: not enough values to unpack (expected 2, got 1)
  • 您的 partitionKey 列表是否足够长(即等于或大于 df.count())?您能否将整个错误消息添加到您的初始问题中?
  • 是的,我的 partitionKey 大于 df.count(),我添加了整个错误信息
  • 哪里添加了整个错误信息?顺便问一下,我的例子对你有用吗?
  • 是的,你的例子对我有用,(整个错误信息如下)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2022-01-13
  • 2023-03-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-24
  • 1970-01-01
相关资源
最近更新 更多