【问题标题】:Spark Dataframe Pass Row rdd to a Python FunctionSpark Dataframe 将 Row rdd 传递给 Python 函数
【发布时间】:2016-12-20 14:34:53
【问题描述】:

我有一个三列 1000 行的 spark 数据框。 (整数、整数和日期类型)。我有一个单独的 python 函数,它获取每一行的每个值并进行一些处理。如何传递这三个值迭代并将输出收集到数据框

【问题讨论】:

  • 请在问题中显示您的代码。
  • 请分享您的代码。

标签: apache-spark


【解决方案1】:

以下示例使用了您可能缺少的两个位:

  • DataFrame 可以表示为行对象的 RDD
  • 可以随意在数据的 DataFrame 和 RDD 表示之间移动

您会看到对DataFrame.rdd.map()RDD.toDF() 的调用。这些是促进这两种表示之间转换的方法。

from pyspark import SparkConf, SparkContext, HiveContext
from datetime import datetime, timedelta

# boring setup
sconf = SparkConf()
sconf.setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=sconf)
hc = HiveContext(sc)


# define your transformation functions
def process_column_a(val):
    return val * 2

def process_column_b(val):
    return val * 3

def process_column_c(val):
    return val + timedelta(days=1)

# this wrapper isn't required but makes calling the transformations easier
def process_row(val_a, val_b, val_c):
    return (process_column_a(val_a), 
            process_column_b(val_b), 
            process_column_c(val_c))


# mocking up some data in the shape you specified
data = ((i, -i, datetime.now() + timedelta(days=i)) for i in range(1000))
initial_dataframe = hc.createDataFrame(data, ["col_a", "col_b", "col_c"])

# call the processing functions in a map over an rdd representation of the data
processed_rdd = initial_dataframe.rdd.map(lambda x: process_row(*x))

# convert the processed rdd back to a dataframe
finished_df = processed_rdd.toDF(initial_dataframe.columns)

# examine the result
finished_df.show()

【讨论】:

  • 问题是你试图在 RDD 中循环一个 RDD,这是不可能的。而不是运行循环,而是将整个数据放在一起然后对其执行操作。
  • 亲爱的@UdayShankarSingh,请看一下这篇文章:stackoverflow.com/q/65916044/6640504
猜你喜欢
  • 2015-09-11
  • 2017-06-13
  • 1970-01-01
  • 1970-01-01
  • 2017-02-03
  • 2022-09-27
  • 2019-08-09
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多