【问题标题】:How can extract features sensitive to length in PySpark without using .toPandas() hack?如何在 PySpark 中提取对长度敏感的特征而不使用 .toPandas() hack?
【发布时间】:2021-11-10 16:13:30
【问题描述】:

我是 PySpark 的新手,我想将 Python 的 Feature Extraction (FE) 部分脚本翻译成 PySpark。起初,我有 Spark 数据框,即所谓的sdf,包括 2 列 A 和 B:

下面是例子:

data A B
https://example1.org/path/to/file?param=42#fragment path/to/file param=42#fragment
https://example2.org/path/to/file path/to/file NaN

现在我想应用一些特征工程并提取特征并将结果与​​列B 中的sdf 连接起来。到目前为止,我可以使用 pythonic 脚本来做到这一点:

#================================> Type <==========================================
def getType(input_value):
  if pd.isna(input_value):
    return "-"
    
  type_ = "-"

  if input_value.isdigit():                                                 # Only numeric
    type_ = "Int"
  elif bool(re.match(r"^[a-zA-Z0-9_]+$", input_value)):                     # Consists of one or more of a-zA-Z, 0-9, underscore , and Chinese
    type_ = "String"
  elif bool(re.match(r"^[\d+,\s]+$", input_value)):                         # Only comma exists as separator "^[\d+,\s]+$"
    type_ = "Array"

  else:  
    existing_separators = re.findall(r"([\+\;\,\:\=\|\\/\#\'\"\t\r\n\s])+", input_value)
    # There are one or more separators
    # when there is only one separator it is not comma (!= "^[\d+,\s]+$")
    if len(existing_separators) > 1 or (len(existing_separators) == 1 and existing_separators[0] != ","):
      type_ = "Sentence"                                                

  return type_


#================================> Length <==========================================
#Number of charactesrs in parameter value
getLength = lambda input_text: 0 if pd.isna(input_text) else len(input_text)

#================================> Token number <==========================================

double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")

token_number = lambda input_text: 0 if pd.isna(input_text) else len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])

#quick test 
param_example = "url=http://news.csuyst.edu.cn/sem/resource/code/rss/rssfeed.jsp?type=list"
out = double_separators_regex.findall(param_example) + [element for pair in single_separators_regex.findall(param_example) for element in pair if element != ""] 

print(out)        #['url','http','news.csuyst.edu.cn','sem','resource','code','rss','rssfeed.jsp','type','list']
print(len(out))   #9

#===================================> Encoding type <============================================

import base64

def isBase64(input_value):
  try:
    return base64.b64encode(base64.b64decode(input_value)) == input_value
  except Exception as e:
    return False

#================================> Character feature <==========================================
N = 2

n_grams = lambda input_text: 0 if pd.isna(input_text) else len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))


#quick test 
n_grams_example = 'zhang1997'  #output = [‘zh’, ‘ha’, ‘an’, ‘ng’, ‘g1’, ‘19’, ‘99’ , ‘97’]
n_grams(n_grams_example)       # 8



#frame the features
features_df = pd.DataFrame()

features_df["Type"] = df.fragment.apply(getType)
features_df["Length"] = df.fragment.apply(getLength)
features_df["Token_number"] = df.fragment.apply(token_number)
features_df["Encoding_type"] = df.fragment.apply(isBase64)
features_df["Character_feature"] = df.fragment.apply(n_grams)

features_df.columns  #Index(['Type', 'Length', 'Token number', 'Encoding type', 'Character feature'], dtype='object')
features_df

问题:什么是翻译 FE 的最佳方法将 Spark 数据帧转换为 Pandas 数据帧toPandas() 以优化管道并以 100% 的火花形式处理它?

所以我很乐意提供一个colab notebook 以便快速调试和评论。

预期的输出以 Spark 数据框的形式显示如下:

+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|data                |A           |B                |Type    |Length|Token_number |Encoding_type |Character_feature|
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+
|https://example1....|path/to/file|param=42#fragment|Sentence|17.0  |3.0          |False         |15.0             |
|https://example2....|path/to/file|Null             |-       |0.0   |0.0          |False         |0.0              |
+--------------------+------------+-----------------+--------+------+-------------+--------------+-----------------+

【问题讨论】:

    标签: apache-spark pyspark feature-extraction


    【解决方案1】:

    我在这里为您制作了一个示例代码,它并不完美,但它至少遵循了您的源代码,并且应该为您指明下一步的方向。我也在每个 Spark 转换上放了一些 cmets。希望对你有用

    from pyspark.sql import functions as F
    from pyspark.sql import functions as f
    from pyspark.sql import types as T
    from pyspark.sql import Window as W
    
    def count_token(input_text):
        import re
        if input_text is None:
            return 0
        double_separators_regex = re.compile(r"[\<\[\(\{]+[0-9a-zA-Z_\.\-]+[\}\)\]\>]+")
        single_separators_regex = re.compile(r"([0-9a-zA-Z_\.\-]+)?[\+\,\;\:\=\|\\/\#\'‘’\"“â€\t\r\n\s]+([0-9a-zA-Z_\.\-]+)?")
        return len(double_separators_regex.findall(input_text) + [element for pair in single_separators_regex.findall(input_text) for element in pair if element != ""])
    
    def n_grams(input_text):
        if input_text is None:
            return 0
        N = 2
        return len(set([input_text[character_index:character_index+N] for character_index in range(len(input_text)-N+1)]))
    
    
    (df
        .withColumn('test', F.base64(F.unbase64('fragment')))
        .withColumn('Type', F
            .when(F.isnull('fragment'), '-')
            .when(~F.isnull(F.col('fragment').cast('int')), 'Int')
            .when(F.regexp_extract('fragment', '^[a-zA-Z0-9_]+$', 0) == F.col('fragment'), 'String')
            .when(F.regexp_extract('fragment', '^[\d+,\s]+$', 0) == F.col('fragment'), 'Array') # not sure about this regex?
            .otherwise('Sentence') # not sure about this condition either, but you can utilize
                                   # `regexp_extract` like above and do any kind of comparision
        )
        .withColumn('Length', F
            .when(F.isnull('fragment'), 0)
            .otherwise(F.length('fragment'))
        )
        .withColumn('Token_number', F.udf(count_token, T.IntegerType())('fragment')) # Spark doesn't provide `findall` alternative, so
                                                                                     # so we have to use UDF here, you can find document here
                                                                                     # http://spark.apache.org/docs/3.0.1/api/python/pyspark.sql.html#pyspark.sql.functions.udf
        .withColumn('Encoding_type', F
            .when(F.isnull('fragment'), False)
            .otherwise(F.base64(F.unbase64(F.col('fragment'))) == F.col('fragment')) # FYI, this is not always correct,
                                                                                     # for example `assert(isBase64('param123') == False)`
        )
        .withColumn('Character_feature', F.udf(n_grams, T.IntegerType())('fragment')) # or you can use more advanced feature from SparkML
                                                                                      # https://spark.apache.org/docs/latest/ml-features.html#n-gram
        .show()
    )
    
    # Output
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
    # |                data|        path|         fragment|test|    Type|Length|Token_number|Encoding_type|Character_feature|
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
    # |https://example1....|path/to/file|param=42#fragment|para|Sentence|    17|           3|        false|               15|
    # |https://example2....|path/to/file|             null|null|       -|     0|           0|        false|                0|
    # +--------------------+------------+-----------------+----+--------+------+------------+-------------+-----------------+
    

    【讨论】:

    • 感谢您的意见。我刚刚在提供的 colab 笔记本中分配了您的解决方案,并获得了与 pythonic 版本不同的结果。 e. G。 Encoding_type 应该是 `False` 并且您的解决方案给出 `True`!可以查看提供的notebook 进行快速调试吗?
    • 我看了你的笔记本,你的示例片段是param=42#fragment,而在笔记本中它是fragment
    • 感谢调试。这是由于来自 from urllib.parse import urlsplit 的不完整解析而发生的。
    猜你喜欢
    • 1970-01-01
    • 2021-01-30
    • 2023-03-30
    • 2023-02-03
    • 1970-01-01
    • 2015-09-07
    • 2015-05-24
    • 2019-02-11
    相关资源
    最近更新 更多