【发布时间】:2017-12-20 20:33:33
【问题描述】:
from pyspark.sql import Row
一个 Row 对象是不可变的。它可以转换为 Python 字典,然后变异,然后返回为 Row 对象。有没有办法在不转换为字典并返回行的情况下制作可变或变异的副本?
这是在 mapPartitions 中运行的函数中需要的。
【问题讨论】:
标签: pyspark spark-dataframe rdd
from pyspark.sql import Row
一个 Row 对象是不可变的。它可以转换为 Python 字典,然后变异,然后返回为 Row 对象。有没有办法在不转换为字典并返回行的情况下制作可变或变异的副本?
这是在 mapPartitions 中运行的函数中需要的。
【问题讨论】:
标签: pyspark spark-dataframe rdd
row.asDict() 和 **dict 都不保留字段的顺序。请注意,在 python 3.6+ 中,这可能会改变。见PEP 468
类似于@hahmed 所说的。这会动态创建一个变异的行,但与传入的行具有相同的架构。
from pyspark.sql import Row
from collections import OrderedDict
def copy(row, **kwargs):
d = OrderedDict(zip(row.__fields__, row)) #note this is not recursive
for key, value in kwargs.iteritems():
d[key]=value
MyRow = Row(row.__fields__)
return MyRow(*d.values())
如果您需要将数据框转换为 RDD,然后再次使其成为 DF,这将非常有用
例如。
df_schema = df.schema
rdd = df_schema.rdd.map(lambda row: copy(row, field=newvalue))
new_df = spark.createDataFrame(rdd, df_schema)
【讨论】:
这是我想出的制作变异副本的动态解决方案:
from pyspark.sql import Row
def copy(row, **kwargs):
dict = {}
for attr in list(row.__fields__):
dict[attr] = row[attr]
for key, value in kwargs.items():
dict[key] = value
return Row(**dict)
row = Row(name="foo", age=45)
print(row) #Row(age=45, name='foo')
new_row = copy(row, name="bar")
print(new_row) #Row(age=45, name='bar')
【讨论】:
根据您的实际用例,一种可能性是简单地从现有的 Row 对象创建一个新的对象。
from pyspark.sql import Row
R = Row('a', 'b', 'c')
r = R(1,2,3)
假设我们要将a 的a 更改为3,用r 创建一个新的Row 对象:
R(3, r.b, r.c)
# Row(a=3, b=2, c=3)
虽然r 仍然是:
r
# Row(a=1, b=2, c=3)
【讨论】: