【问题标题】:How to reference a dataframe when in an UDF on another dataframe?在另一个数据帧上的 UDF 中如何引用数据帧?
【发布时间】:2017-05-14 10:05:25
【问题描述】:

在另一个数据帧上执行 UDF 时如何引用 pyspark 数据帧?

这是一个虚拟示例。我正在创建两个数据框scoreslastnames,每个数据框内都有一个在两个数据框中相同的列。在scores 上应用的UDF 中,我想过滤lastnames 并返回在lastname 中找到的字符串。

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *

sc = SparkContext("local")
sqlCtx = SQLContext(sc)


# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []

for (student_id, subject) in itertools.product(student_ids, subjects):
    data.append((student_id, subject, random.randint(0, 100)))

from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("subject", StringType(), nullable=False),
            StructField("score", IntegerType(), nullable=False)
    ])

# Create DataFrame 
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)

# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
    data2.append((student_ids[i], last_name[i]))

schema = StructType([
            StructField("student_id", StringType(), nullable=False),
            StructField("last_name", StringType(), nullable=False)
    ])

rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)


scores.show()
lastnames.show()


from pyspark.sql.functions import udf
def getLastName(sid):
    tmp_df = lastnames.filter(lastnames.student_id == sid)
    return tmp_df.last_name

getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)

以下是trace的最后一部分:

Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

【问题讨论】:

  • 您不能在 UDF 中访问 df,因为它将在 executor 中处理,而 df ref 只能从驱动程序访问。您可以为lastnames 使用广播变量。如果需要任何帮助,请告诉我。
  • 但请考虑将lastnamesscores 一起加入,而不是从UDF 中加入。
  • 嗨@mrsrinivas,感谢您的回复。首先,我无法加入,因为即使这个虚拟示例可以使用加入来解决,但在我的实际实现中,我需要在 UDF 中进行更多处理。其次,是的!在这种情况下如何使用广播变量?

标签: apache-spark dataframe pyspark user-defined-functions broadcast


【解决方案1】:

将对更改为字典以便于查找名称

data2 = {}
for i in range(len(student_ids)):
    data2[student_ids[i]] = last_name[i]

而不是创建rdd 并使其成为df 创建广播变量

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema)
lastnames = sc.broadcast(data2)  

现在在广播变量 (lastnames) 上使用 values attr 在 udf 中访问它。

from pyspark.sql.functions import udf
def getLastName(sid):
    return lastnames.value[sid]

【讨论】:

  • 我用广播变量修改了你的实现。尽量让你的UDF尽可能纯函数,过多的外部依赖可能会降低性能。
  • 我尝试了你的代码 sn-p - 当我查看 lastnames.value 时,我得到了 [('student1', 'Granger'), ('student2', 'Weasley'), ('student3', 'Potter')],这意味着 lastnames.value.filter 不再工作了,对吧?
  • 是的,看来。在udf中尝试return lastnames.value["sid"]并创建一个字典(变量data2),其中sid作为键,值作为lastname
  • onvery pd df 来激发 df
【解决方案2】:

您不能从 UDF 中直接引用数据帧(或 RDD)。 DataFrame 对象是驱动程序的句柄,Spark 使用它来表示将在集群上发生的数据和操作。在 Spark 选择的时候,UDF 中的代码将在集群上用完。 Spark 通过序列化该代码并复制闭包中包含的任何变量并将它们发送给每个工作人员来做到这一点。

您想要做的是使用 Spark 在其 API 中提供的构造来连接/组合两个 DataFrame。如果其中一个数据集很小,您可以手动发送广播变量中的数据,然后从您的 UDF 访问它。否则,您可以像以前一样创建两个数据框,然后使用连接操作将它们组合起来。像这样的东西应该可以工作:

joined = scores.withColumnRenamed("student_id", "join_id")
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\
               .drop("join_id")
joined.show()

+---------+-----+----------+---------+
|  subject|score|student_id|last_name|
+---------+-----+----------+---------+
|     Math|   13|  student1|  Granger|
|  Biology|   85|  student1|  Granger|
|Chemistry|   77|  student1|  Granger|
|  Physics|   25|  student1|  Granger|
|     Math|   50|  student2|  Weasley|
|  Biology|   45|  student2|  Weasley|
|Chemistry|   65|  student2|  Weasley|
|  Physics|   79|  student2|  Weasley|
|     Math|    9|  student3|   Potter|
|  Biology|    2|  student3|   Potter|
|Chemistry|   84|  student3|   Potter|
|  Physics|   43|  student3|   Potter|
+---------+-----+----------+---------+

还值得注意的是,在 Spark DataFrames 的底层有一个优化,如果它足够小,作为连接的一部分的 DataFrame 可以转换为广播变量以避免随机播放。因此,如果您使用上面列出的连接方法,您应该可以获得最佳性能,而不会牺牲处理更大数据集的能力。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-28
    • 1970-01-01
    • 2020-09-18
    相关资源
    最近更新 更多