【问题标题】:Databricks Autoloader - Column Transformation - Column is not iterableDatabricks Autoloader - 列转换 - 列不可迭代
【发布时间】:2022-01-11 08:56:21
【问题描述】:

我正在使用 Azure Databricks Autoloader 将文件从 ADLS Gen 2 处理到 Delta Lake。我以以下方式编写了我的 Foreach 批处理函数(pyspark):

#Rename incoming dataframe columns
schemadf = transformschema.renameColumns(microBatchDF,fileconfig)

# Apply simple tranformation on schemadf using createOrReplaceTempView
modifieddf = applytransform(schemadf,targettable,targetdatabase,fileconfig)   

# Add audit cols to modifieddf
transformdf = auditlineage.addauditcols(modifieddf,fileconfig,appid

重命名列的代码

def renameColumns(dataframe, schema):
  str = schema['Schema']
  splitstr = list(str.split(','))   
  for c,n in zip(dataframe.columns,splitstr):
      dataframe=dataframe.withColumnRenamed(c,n)
  return dataframe

applytransform 代码

def applytransform(inputdf,targettable,targetdatabase,fileconfig):  
  logger.info('Inside applytransform for Database/Table {}.{}',targetdatabase,targettable)
  inputdf.createOrReplaceTempView("src_to_transform")
  lspark = inputdf._jdf.sparkSession()
  if 'TransformQuery' in fileconfig and fileconfig['TransformQuery'] is not None:
    vsqlscript = fileconfig['TransformQuery']
    df = lspark.sql(vsqlscript)    
    logger.info("Applied Tranform")    
    return df
  else:
    logger.info("Passed DF")
    return inputdf

addauditcols 代码

def addauditcols(inputdf,fileconfig,app_id):
    now = datetime.datetime.now()
    print(type(inputdf))    
    createdby = 'DatabricksJob-'+app_id
    datasource = fileconfig['Datasource']
    recordactiveind = 'Y'
    df = inputdf.withColumn('datasource',lit(datasource)).\
    withColumn('createdtimestamp',lit(now)).\
    withColumn('lastmodifiedtimestamp',lit(now)).\
    withColumn('createduserid',lit(createdby)).\
    withColumn('lastmodifieduserid',lit(createdby)).\
    withColumn('filepath',input_file_name()).\
    withColumn('recordactiveind',lit(recordactiveind))
    return df

applytransform 模块返回一个 py4j.java_gateway.JavaObject 而不是常规的 pyspark.sql.dataframe.DataFrame 因此我不能在 addauditcols 模块

中对 modifieddf 执行简单的 withColumn() 类型转换

我得到的错误如下:

2021-12-05 21:09:57.274 | INFO     | __main__:main:73 - modifieddf Type::: 
<class 'py4j.java_gateway.JavaObject'>
2021-12-05 21:09:57.421 | ERROR    | __main__:main:91 - Operating Failed for md_customer, with Exception Column is not iterable
Traceback (most recent call last):

  File "c:/Users/asdsad/integration-app\load2cleansed.py", line 99, in <module>
    main()
    └ <function main at 0x000001C570C263A0>

> File "c:/Users/asdsad/integration-app\load2cleansed.py", line 76, in main
    transformdf = auditlineage.addauditcols(modifieddf,fileconfig,appid)
                  │            │            │          │          └ 'local-1638760184357'
                  │            │            │          └ {'Schema': 'customernumber,customername,addrln1,city,statename,statecode,postalcode,countrycode,activeflag,sourcelastmodified...
                  │            │            └ JavaObject id=o48
                  │            └ <function addauditcols at 0x000001C570B55CA0>
                  └ <module 'core.wrapper.auditlineage' from 'c:\\Users\\asdsad\integration-app\\core\\wrapper\\a...

  File "c:\Users\1232\Documents\Code\ntegration-app\core\wrapper\auditlineage.py", line 30, in addauditcols
    df = inputdf.withColumn('datasource',lit(datasource)).\
         │                               │   └ 'DUMMY-CUST'
         │                               └ <function lit at 0x000001C570B79F70>
         └ JavaObject id=o48

  File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1296, in __call__
    args_command, temp_args = self._build_args(*args)
                              │    │            └ ('datasource', Column<'DUMMY-CUST'>)
                              │    └ <function JavaMember._build_args at 0x000001C5704B9280>
                              └ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>

  File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1260, in _build_args
    (new_args, temp_args) = self._get_args(args)
                            │    │         └ ('datasource', Column<'DUMMY-CUST'>)
                            │    └ <function JavaMember._get_args at 0x000001C5704B91F0>
                            └ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>

  File "C:\Users\testapp\lib\site-packages\py4j\java_gateway.py", line 1247, in _get_args
    temp_arg = converter.convert(arg, self.gateway_client)
               │         │       │    │    └ <py4j.java_gateway.GatewayClient object at 0x000001C5705C89A0>
               │         │       │    └ <py4j.java_gateway.JavaMember object at 0x000001C570C5B910>
               │         │       └ Column<'DUMMY-CUST'>
               │         └ <function ListConverter.convert at 0x000001C5704CE5E0>
               └ <py4j.java_collections.ListConverter object at 0x000001C5704C3FD0>

  File "C:\Users\testapp\lib\site-packages\py4j\java_collections.py", line 510, in convert
    for element in object:
                   └ Column<'DUMMY-CUST'>

  File "C:\Users\testapp\lib\site-packages\pyspark\sql\column.py", line 470, in __iter__
    raise TypeError("Column is not iterable")

TypeError: Column is not iterable

感谢任何帮助

【问题讨论】:

标签: python databricks azure-databricks databricks-autoloader


【解决方案1】:

请删除lspark = inputdf._jdf.sparkSession()

它用于 sql upsert 命令以 delta 类似合并不返回数据帧。

请使用spark.sql(vsqlscript)

如果没有帮助,请也分享您的 vsqlscript 代码。

【讨论】:

  • 当我替换它时,我得到pyspark.sql.utils.AnalysisException: Table or view not found: src_to_transform; line 1 pos 165; 'Project ['customernumber, 'customername, 'addrln1, 'city, 'statecode AS statename#1104, 'split('statename, /)[1] AS statecode#1105, 'postalcode, 'split('statename, /)[0] AS countrycode#1106] +- 'UnresolvedRelation [src_to_transform], [], false
  • vsqlscript: select customernumber,customername,addrln1,city,statecode as statename,split(statename, '/')[1] as statecode,postalcode,split(statename, '/')[0] as countrycode from src_to_transform
【解决方案2】:

谢谢。而不是 createOrReplaceTempView ,我更新为 createOrReplaceGlobalTempView 并将 vsqlscript 更新为从 select * from global_temp.src_to_transform 读取 在 applytransform 中进行了以下更改:

def applytransform(inputdf,targettable,targetdatabase,fileconfig):
  
  logger.info('Inside applytransform for Database/Table {}.{}',targetdatabase,targettable)
  
  **## Store in global Temp databricks database**
  inputdf.createOrReplaceGlobalTempView("src_to_transform")
  lspark = inputdf._jdf.sparkSession()
  if 'TransformQuery' in fileconfig and fileconfig['TransformQuery'] is not None:
    vsqlscript = fileconfig['TransformQuery']
    #df = lspark.sql(vsqlscript)
    df = spark.sql(vsqlscript)    
    logger.info("Applied Tranform")
    df.show()
    return df
  else:
    logger.info("Passed DF")
    return inputdf 

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-12-30
    • 2023-01-03
    • 2022-01-11
    • 2016-08-23
    • 2021-02-23
    • 2022-09-23
    • 1970-01-01
    • 2021-04-27
    相关资源
    最近更新 更多