【发布时间】:2016-12-20 14:34:53
【问题描述】:
我有一个三列 1000 行的 spark 数据框。 (整数、整数和日期类型)。我有一个单独的 python 函数,它获取每一行的每个值并进行一些处理。如何传递这三个值迭代并将输出收集到数据框
【问题讨论】:
-
请在问题中显示您的代码。
-
请分享您的代码。
标签: apache-spark
我有一个三列 1000 行的 spark 数据框。 (整数、整数和日期类型)。我有一个单独的 python 函数,它获取每一行的每个值并进行一些处理。如何传递这三个值迭代并将输出收集到数据框
【问题讨论】:
标签: apache-spark
以下示例使用了您可能缺少的两个位:
您会看到对DataFrame.rdd.map() 和RDD.toDF() 的调用。这些是促进这两种表示之间转换的方法。
from pyspark import SparkConf, SparkContext, HiveContext
from datetime import datetime, timedelta
# boring setup
sconf = SparkConf()
sconf.setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=sconf)
hc = HiveContext(sc)
# define your transformation functions
def process_column_a(val):
return val * 2
def process_column_b(val):
return val * 3
def process_column_c(val):
return val + timedelta(days=1)
# this wrapper isn't required but makes calling the transformations easier
def process_row(val_a, val_b, val_c):
return (process_column_a(val_a),
process_column_b(val_b),
process_column_c(val_c))
# mocking up some data in the shape you specified
data = ((i, -i, datetime.now() + timedelta(days=i)) for i in range(1000))
initial_dataframe = hc.createDataFrame(data, ["col_a", "col_b", "col_c"])
# call the processing functions in a map over an rdd representation of the data
processed_rdd = initial_dataframe.rdd.map(lambda x: process_row(*x))
# convert the processed rdd back to a dataframe
finished_df = processed_rdd.toDF(initial_dataframe.columns)
# examine the result
finished_df.show()
【讨论】: