【问题标题】:pySpark mapping multiple variablespySpark 映射多个变量
【发布时间】:2020-09-09 20:02:01
【问题描述】:

下面的代码将我的参考 df 的值和列名映射到我的实际数据集,查找完全匹配,如果找到完全匹配,则返回 OutputValue。但是,我正在尝试添加规则,即当PrimaryValue = DEFAULT 也返回OutputValue

我正在尝试解决此问题的解决方案是创建一个具有空值的新数据框 - 因为下面的代码没有提供匹配项。因此,下一步将针对空值,其对应的PrimaryValue = DEFAULT 将空值替换为OutputValue

  #create a map based on columns from reference_df
  map_key = concat_ws('\0', final_reference.PrimaryName, final_reference.PrimaryValue)
  map_value = final_reference.OutputValue

  #dataframe of concatinated mappings to get the corresponding OutputValues from reference table
  d = final_reference.agg(collect_set(array(concat_ws('\0','PrimaryName','PrimaryValue'), 'OutputValue')).alias('m')).first().m
  #display(d)

  #iterate through mapped values 
  mappings = create_map([lit(i) for i in chain.from_iterable(d)])

  #dataframe with corresponding matched OutputValues
  dataset = datasetM.select("*",*[ mappings[concat_ws('\0', lit(c), col(c))].alias(c_name) for c,c_name in matched_List.items()]) 
  display(dataset)

【问题讨论】:

  • 你的意思是当来自primaryLookupAttributeName_List的请求的col-name在datasetMatchedPortfolio中不存在时会产生错误?所以你想添加一个默认名称来通过错误?
  • @jxc,所以没有错误。它只是用空值填充列。数据集永远不会包含 DEFAULT ,它将具有常规值。当PrimaryLookupAttributeName 是默认值时,我想用相应的OutputItemNameByValue 替换那些空值(未找到匹配项)。我会用更多信息更新我的问题!
  • 很可能,您只需要合并,例如:coalesce(mappings[concat_ws('\0', lit(c), col(c))], lit("DEFAULT")).alias(c_name)。确保导入 pyspark.sql.functions.coalesce
  • @jxc,对不起,您是打算对空 df 执行此操作还是将其作为我最初的 datasetPrimaryAttributes_False = 的一部分包含在内
  • @jxc,您是否建议在此处包含带有 if/else/elif 循环的 udf?我有 3 种匹配情况:1)如果找到匹配,则复制 outputValue,2)如果默认,复制 outputValue,3)如果根本没有匹配并且 null,“找不到查找”。否则,到目前为止,我的想法是继续构建过滤数据框,直到我的最后一个案例并且所有数据集值都有相应的更新值。

标签: python dataframe pyspark apache-spark-sql mapping


【解决方案1】:

根据 cmets 中的讨论,我认为您只需从现有映射中添加一个默认映射,然后使用 coalease() 函数查找第一个非空值,如下所示:

from pyspark.sql.functions import collect_set, array, concat_ws, lit, col, create_map, coalesce

# skip some old code

d    
#[['LeaseStatus\x00Abandoned', 'Active'],
# ['LeaseStatus\x00DEFAULT', 'Pending'],
# ['LeaseRecoveryType\x00Gross-modified', 'Modified Gross'],
# ['LeaseStatus\x00Archive', 'Expired'],
# ['LeaseStatus\x00Terminated', 'Terminated'],
# ['LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'],
# ['LeaseRecoveryType\x00Gross', 'Gross']]

# original mapping
mappings = create_map([ lit(j) for i in d for j in i ])

# default mapping
mappings_default = create_map([ lit(j.split('\0')[0]) for i in d if i[0].upper().endswith('\x00DEFAULT') for j in i ])
#Column<b'map(LeaseStatus, Pending)'>

# a set of available PrimaryLookupAttributeName
available_list = set([ i[0].split('\0')[0] for i in d ])
# {'LeaseRecoveryType', 'LeaseStatus'}

# use coalesce to find the first non-null values from mappings, mappings_defaul etc
datasetPrimaryAttributes_False = datasetMatchedPortfolio.select("*",*[ 
  coalesce(
    mappings[concat_ws('\0', lit(c), col(c))],
    mappings_default[c],
    lit("Not Specified at Source" if c in available_list else "Lookup not found")
  ).alias(c_name) for c,c_name in matchedAttributeName_List.items()])

一些解释

(1) d 是从reference_df 中检索到的列表列表,我们使用列表推导[ lit(j) for i in d for j in i ] 将其展平为列表并将展平后的列表应用于create_map 函数:

(2) mappings_default 与上面类似,但添加了一个if 条件作为过滤器,并且只保留PrimaryLookupAttributeValue(这是内部列表i[0] 的第一项)以@987654327 结尾的条目@ 然后使用 split 从 map_key 中剥离 PrimaryLookupAttributeValue(基本上是 \x00DEFAULT)。

【讨论】:

  • 你能解释一下mappings
  • 其实我明白。我会重写它们来描述变量名,谢谢!另外,我刚刚意识到我实际上还有另一种情况。 1)如果找到匹配,则复制 outputValue,2)如果 DEFAULT,复制 outputValue,3)如果根本没有匹配并且为 null,则“找不到查找”4)如果数据集中有 null 值。例如, dataset.LeaseRecoveryType 有空值,所以不匹配不是因为没有 reference_table 值,而是因为实际数据集没有提供值。所以如果数据集值=null,则返回“Not Specified at Source” ....@jxc
  • 不要认为它有效。当 dataset.LeaseRecoveryType = null 时,我仍然只得到“查找未找到”。现在正在玩它@jxc
  • if c in available_list改成if c not in available_list怎么样?
  • 是的!我也试过了,它有效..但是,很难理解为什么
猜你喜欢
  • 2020-09-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多