【问题标题】:passing DF to utility as parameter and trying to access the DF in python将 DF 作为参数传递给实用程序并尝试在 python 中访问 DF
【发布时间】:2022-01-16 23:46:03
【问题描述】:

我在 test.py 中创建了一个数据框并调用了一个实用程序 test1.py 并将该数据框作为参数传递给该实用程序。当我尝试在实用程序中使用df.rdd.collect() 时,它不起作用。像变量一样出现错误没有像rdd这样的任何参数。你能帮我看看如何在实用程序中读取数据框吗?

例如:

#test.py

from pyspark.sql import SparkSession
from test1 import samplefunction


spark = SparkSession.builder.app("examples").getOrCreate
dat = [(1234, "2021-12-13","2021-12-13T17:55:26+0000",10),(1234, "2021-12-12","2021-12-12T17:55:26+0000",1),(1234, "2021-12-11","2021-12-11T17:55:26+0000",621))]
col = [transid,create_dt,create_ts,purge]

df = spark.CreateDateframe(data = dat, schema = col)

result = samplefunction(df)
print(result)

#test1.py

def samplefunction(df):
 total = 0
 msg = ""
  for row in df.rdd.collect():
    value1 = f'Transaction ID : {int(row.col1 value)}\n'
    value2 = f'Created Date : {str(row.col2 value)}\n'
    value3 = f'Created Time : {str(row.col3 value)}\n'
    value4 = f'Purged : {int(row.col4 value)}\n'

    total = total + row.col4 value
    msg = msg + value1 + value2 + value3 + value4
  return msg

【问题讨论】:

  • 嗨,实际上我已经创建了数据框并创建了通用实用程序(test1.py)来以某种特定格式打印数据。当我将数据框传递给该实用程序时,实用程序并未将其视为数据框,而是将其选为参数。所以无法执行 rdd.collect() 操作。我已经用更多细节更新了我的代码。
  • 发送 rdd df 很重要,您能查看我的更新吗?
  • 谢谢,我已经检查并尝试了,似乎我们无法将数据帧作为参数传递给另一个模块中的另一个函数。我正在尝试使用列表列表,让您知道它是否有效。

标签: python dataframe pyspark


【解决方案1】:

您的代码中存在基本错误。但是您可以使用我认为您正在描述的以下代码;

from pyspark.sql import SparkSession

def samplefunction(df):
    total = 0
    msg = ""
    for row in df.rdd.collect():
        value1 = f'Transaction ID : {int(row["transid"])}\n'
        value2 = f'Created Date : {str(row["create_dt"])}\n'
        value3 = f'Created Time : {str(row["create_ts"])}\n'
        value4 = f'Purged : {int(row["purge"])}\n'

        total += int(row["purge"])
        msg = msg + value1 + value2 + value3 + value4


    return total

if __name__ == '__main__':

    spark = SparkSession.builder.getOrCreate()
    dat = [(1234, "2021-12-13", "2021-12-13T17:55:26+0000", 10), (1234, "2021-12-12", "2021-12-12T17:55:26+0000", 1), (1234, "2021-12-11", "2021-12-11T17:55:26+0000", 621)]
    col = ['transid', 'create_dt', 'create_ts', 'purge']

    rdd = spark.sparkContext.parallelize(dat)
    dfFromRDD1 = rdd.toDF(col)

    result = samplefunction(dfFromRDD1)
    print(result)


>>> 632
  1. 您不能像您在 samplefunction 函数中描述的那样添加整数和字符串
  2. 将 df 从 rdd 转换为 Df 后,您可以使用 .collect() 进行迭代。 Ps:请说明这是否是您要查找的内容。

【讨论】:

  • 嗨,我的要求不同,我已经更新了请求的更多细节。
猜你喜欢
  • 2019-08-21
  • 2019-12-21
  • 1970-01-01
  • 1970-01-01
  • 2018-04-14
  • 1970-01-01
  • 2016-01-20
  • 1970-01-01
  • 2021-01-10
相关资源
最近更新 更多