【问题标题】:Unable to use Pandas UDF in Databricks无法在 Databricks 中使用 Pandas UDF
【发布时间】:2021-04-16 17:52:32
【问题描述】:

我必须运行一个脚本,该脚本将一些参数作为输入并返回一些结果作为输出,所以首先我在本地机器上开发了它 - 工作正常 - 我现在的目标是在 Databricks 中运行它以并行化它.

当我尝试并行化它时,问题就出现了。我正在从已安装的 Datalake 中获取数据(问题不存在,因为我能够在读取 DataFrame 后打印它),将其转换为 Spark DataFrame 并将每一行传递给按材料分组的主函数:

import pandas as pd
import os
import numpy as np
import scipy.stats as stats

from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType,StructField,IntegerType,FloatType

# Pandas udf
schema = StructType([StructField('Material', IntegerType(), True),
                    StructField('Alpha', IntegerType(), True),
                    StructField('Beta', IntegerType(), True),
                    StructField('Sales', IntegerType(), True),
                    StructField('SL', FloatType(), True)])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def main(data):
    material = data['Material'].iloc[0]
    print(material)      #<-------- THIS IS NOT PRINTING
    print('Hello world')   #<------ NEITHER IS THIS

    start = data['start '].iloc[0]
    end = data['end '].iloc[0]
    mu_lt = data['mu_lt'].iloc[0]
    sigma_lt = data['sigma_lt'].iloc[0]
    
    df = pd.DataFrame(columns=('Material', 'Alpha', 'Beta', 'Sales', 'SL'))
    
    for beta in range(1, 2):
        for alpha in range(3, 5):
            # Do stuff
    
    return df


if __name__ == '__main__':
  spark = SparkSession.builder.getOrCreate()
  params = pd.read_csv('/dbfs/mnt/input/params_input.csv')
  params_spark = spark.createDataFrame(params) 

  params_spark.groupby('Material').apply(main).show()

我不确定我是否将 DF 正确传递给主函数,甚至声明它是正确的,但主函数中定义的打印和 DF 似乎都没有运行。代码没有抛出错误,但也没有返回任何输出。

【问题讨论】:

  • print 语句在用于 udf 函数时不起作用。
  • 我如何知道数据(例如材料)被正确读取?
  • 您可以使用显示或显示功能来检查数据框本身。但这就是它的延伸。 Spark 使用惰性求值,但 UDF 只是绕过它,所以我不确定你是否可以用其他方式调试它。
  • 但是如果我在 main 函数中写 'data.show()' 是正常的吗?
  • 你只需要在 executors 上查看这些数据...

标签: python pandas apache-spark pyspark databricks


【解决方案1】:

试试this:

@pandas_udf('y int, ds int, store_id string, product_id string, log string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    return pd.DataFrame([3, 5, 'store123', 'product123', 'My log message'], columns=['y', 'ds','store_id','product_id', 'log'])

【讨论】:

  • 日志不是即时的,而是通过这种方式找到了错误!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-09-08
  • 2023-01-23
  • 2020-01-05
  • 2021-05-06
  • 1970-01-01
  • 1970-01-01
  • 2022-01-16
相关资源
最近更新 更多