【问题标题】:Using dictionary in regexp_replace function in pyspark在 pyspark 的 regexp_replace 函数中使用字典
【发布时间】:2019-04-17 19:06:16
【问题描述】:

我想使用字典对 pyspark 数据框列执行 regexp_replace 操作。

字典:{'RD':'ROAD','DR':'DRIVE','AVE':'AVENUE',....} 字典将有大约 270 个键值对。

输入数据框:

ID  | Address    
1   | 22, COLLINS RD     
2   | 11, HEMINGWAY DR    
3   | AVIATOR BUILDING    
4   | 33, PARK AVE MULLOHAND DR

所需的输出数据帧:

ID   | Address  | Address_Clean    
1    | 22, COLLINS RD    | 22, COLLINS ROAD    
2    | 11, HEMINGWAY DR     | 11, HEMINGWAY DRIVE    
3    | AVIATOR BUILDING      | AVIATOR BUILDING    
4    | 33, PARK AVE MULLOHAND DR    | 33, PARK AVENUE MULLOHAND DRIVE

我在互联网上找不到任何文档。如果尝试按以下代码传递字典-

data=data.withColumn('Address_Clean',regexp_replace('Address',dict))

抛出错误“regexp_replace 需要 3 个参数,2 个给定”。

数据集的大小约为 2000 万。因此,UDF 解决方案会很慢(由于按行操作)并且我们无法访问支持 pandas_udf 的 spark 2.3.0。 除了使用循环之外,还有其他有效的方法吗?

【问题讨论】:

  • 尝试解决方案here
  • 谢谢。但如果我理解正确,这个解决方案基本上是使用循环来替换键值对。如底部评论的链接中所述,这会导致缩放问题。是否没有其他不需要循环的方法,因为它会减慢 2000 万行数据集的处理速度
  • 如果您阅读链接解决方案中的说明,您会发现 spark 实际上并没有循环。同样在另一个用户的情况下,缩放问题出现是因为他试图进行 10000 次替换 - 问题在于替换的数量,而不是行数。您的 270 次替换可能会起作用,但如果不是,您可以尝试每 N 次(例如 10 次)替换缓存一次 DataFrame。唯一的其他方法可能是分区/过滤您的数据框并仅应用替换的子集。 (即过滤包含"RD" 等)。

标签: regex dictionary pyspark spark-dataframe


【解决方案1】:

它给你这个错误是因为 regexp_replace() 需要三个参数:

regexp_replace('column_to_change','pattern_to_be_changed','new_pattern')

但你是对的,这里不需要 UDF 或循环。你只需要更多的正则表达式和一个看起来和你原来的目录一模一样的目录表:)

这是我的解决方案:

# You need to get rid of all the things you want to replace. 
# You can use the OR (|) operator for that. 
# You could probably automate that and pass it a string that looks like that instead but I will leave that for you to decide.

input_df = input_df.withColumn('start_address', sf.regexp_replace("original_address","RD|DR|etc...",""))


# You will still need the old ends in a separate column
# This way you have something to join on your directory table.

input_df = input_df.withColumn('end_of_address',sf.regexp_extract('original_address',"(.*) (.*)", 2))


# Now we join the directory table that has two columns - ends you want to replace and ends you want to have instead.

input_df = directory_df.join(input_df,'end_of_address')


# And now you just need to concatenate the address with the correct ending.

input_df = input_df.withColumn('address_clean',sf.concat('start_address','correct_end'))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-18
    • 2021-09-14
    • 2021-10-14
    • 2015-03-12
    • 1970-01-01
    • 1970-01-01
    • 2016-02-23
    • 2018-12-06
    相关资源
    最近更新 更多