【发布时间】:2021-04-01 13:28:04
【问题描述】:
下面的代码是基于udf dictionary broadcast 博客发布的。在下面的代码中,广播字典应该返回一个字典列表,即预期从广播字典返回 {'key':[{'key':'value','key2':'value'},{'key':'value ','key2':'值'}]
当我尝试从列表中的字典访问任何键时,不会返回任何值。我知道这一点是因为我收到错误“分配前引用的局部变量 'tmp'”。此变量应包含由列表中每个字典中定义的操作产生的计算值。谁能告诉我为什么或如何调试这个或为什么字典键没有返回值?我在返回广播密钥的代码中添加了打印语句,但它从未出现在输出或驱动程序日志中。
广播词典
calc dict {'name': 'Lvl1_Name_Score', 'operations': [{'operator': 'multi', 'value': 1.5}, {'operator': 'div', 'value': 1}]}
def apply_weights_a(calc_broadcasted,key):
def apply_weights(col_name):
calc_operations = calc_broadcasted.value.get(key)
results = 0
print(f'calc_operations {calc_operations}')
for op in calc_operations:
print(f'op {op}')
# for i,op in enumerate(calc['dependencies']
if op['operator'] == 'div': tmp = col_name / float(op['value'])
elif op['operator'] == 'multi': tmp = col_name * float(op['value'])
elif op['operator'] == 'sub':tmp = col_name - float(op['value'])
elif op['operator'] == 'add': tmp = col_name + float(op['value'])
# sdf = sdf['tmp'].apply(lambda x: 0 if x < 0 else x)
results = results + tmp
return results
return udf(apply_weights)
def multi_apply_weights(col_names,weights):
def inner(sdf):
for col_name in col_names:
#print(f"weights {weights}")
size = len(col_name) #get col_name without _c
calc = [mylist for mylist in weights if mylist['name'] == col_name[:size-2] ]
#print(f"calc {calc}")
calc_dict = calc[0]
#print(f"calc dict {calc[0]}")
b = spark.sparkContext.broadcast(calc_dict)
sdf = sdf.withColumn(
col_name,
apply_weights_a(b,'operations')(col_name)
)
return sdf
return inner
sdf = multi_apply_weights(weight_list,config['algorithmScores'])(sdf)
此 spark 代码是转换此 pandas 函数的结果,该函数在一组列上执行 1 到多个不同的计算...
def apply_weights(df, weights):
for calc in weights:
col = calc['name']
for op in calc['operations']:
if op['operator'] == 'div':
df[col + '_w'] = df[col]/float(op['value'])
elif op['operator'] == 'multi':
df[col+ '_w'] = df[col]*float(op['value'])
elif op['operator'] == 'sub':
df[col+ '_w'] = df[col]-float(op['value'])
elif op['operator'] == 'add':
df[col+ '_w'] = df[col]+float(op['value'])
df[col] = df[col].apply(lambda x: 0 if x < 0 else x)
return df
【问题讨论】:
-
如果你让你的代码更简约,你可能会有更好的运气?有很多代码要进入。见How to create a Minimal, Reproducible Example。
标签: python dictionary user-defined-functions databricks