【问题标题】:Pandas UDF (PySpark) - Incorrect type ErrorPandas UDF (PySpark) - 不正确的类型错误
【发布时间】:2020-09-01 05:00:05
【问题描述】:

我正在尝试使用 spaCy 和 Pandas UDF (PySpark) 进行实体提取,但出现错误。
使用 UDF 可以正常工作,但速度很慢。我做错了什么?

每次加载模型是为了避免加载错误 - Can't find model 'en_core_web_lg'. It doesn't seem to be a shortcut link, a Python package or a valid path to a data directory.

工作 UDF:

def __get_entities(x):

    global nlp
    nlp = spacy.load("en_core_web_lg")
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
            ents.append(ent.label_)

    return ents

get_entities_udf = F.udf(__get_entities), T.ArrayType(T.StringType()))

Pandas UDF 有错误:

def __get_entities(x):

    global nlp
    nlp = spacy.load("en_core_web_lg")
    ents=[]

    doc = nlp(x)

    for ent in doc.ents:
        if ent.label_ == 'PERSON' OR ent.label_ == 'ORG':
            ents.append(ent.label_)

    return pd.Series(ents)


get_entities_udf = F.pandas_udf(lambda x: __get_entities(x), "array<string>", F.PandasUDFType.SCALAR)

错误信息:

TypeError: Argument 'string'has incorrect type (expected str, got series)

Spark DataFrame 示例:

df = spark.createDataFrame([
  ['John Doe'],
  ['Jane Doe'],
  ['Microsoft Corporation'],
  ['Apple Inc.'],
]).toDF("name",)

新栏目:

df_new = df.withColumn('entity',get_entities_udf('name'))

【问题讨论】:

    标签: pandas apache-spark pyspark user-defined-functions spacy


    【解决方案1】:

    您需要将输入视为pd.Series 而不是单个值

    我能够通过稍微重构代码来使其工作。注意 x.apply 调用,它是 pandas 特定的,并将函数应用于 pd.Series

    def entities(x):
        global nlp
        import spacy
        nlp = spacy.load("en_core_web_lg")
        ents=[]
    
        doc = nlp(x)
    
        for ent in doc.ents:
            if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
                ents.append(ent.label_)
        return ents
    
    
    def __get_entities(x):
        return x.apply(entities)
    
    get_entities_udf = pandas_udf(lambda x: __get_entities(x), "array<string>", PandasUDFType.SCALAR)
    
    df_new = df.withColumn('entity',get_entities_udf('name'))
    
    df_new.show()
    
    +--------------------+--------+
    |                name|  entity|
    +--------------------+--------+
    |            John Doe|[PERSON]|
    |            Jane Doe|[PERSON]|
    |Microsoft Corpora...|   [ORG]|
    |          Apple Inc.|   [ORG]|
    +--------------------+--------+
    

    【讨论】:

      【解决方案2】:

      我正在使用:pyspark 3.1.1python 3.7

      上面的答案对我不起作用,我花了很多时间让事情正常进行,所以我想我会分享我想出的解决方案。

      设置

      创建一个包含 16 个随机个人和公司名称的样本

      import pandas as pd
      
      from pyspark.sql import SparkSession
      from pyspark.sql import functions as F
      from pyspark.sql.types import StringType, ArrayType
      from pyspark.sql.functions import pandas_udf, PandasUDFType
      
      from faker import Faker
      import spacy
      
      spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()
      
      names = []
      fake = Faker()
      for _ in range(8):
          names.append(f"{fake.company()} {fake.company_suffix()}")
          names.append(fake.name())
      
      df = spark.createDataFrame(names, StringType())
      
      

      原样

      首先,检查当前提出的解决方案。我只是在加载 spacy 模型时添加一个打印语句,以查看我们加载模型的时间。

      # printing a msg each time we load the model
      def load_spacy_model():
          print("Loading spacy model...")
          return spacy.load("en_core_web_sm")
      
      def entities(x):
          global nlp
          import spacy
          nlp = load_spacy_model()
          ents=[]
      
          doc = nlp(x)
      
          for ent in doc.ents:
              if ent.label_ == 'PERSON' or ent.label_ == 'ORG':
                  ents.append(ent.label_)
          return ents
      
      
      def __get_entities(x):
          return x.apply(entities)
      
      get_entities_udf = pandas_udf(lambda x: __get_entities(x), "array<string>", PandasUDFType.SCALAR)
      
      df_new = df.withColumn('entity',get_entities_udf('value'))
      
      df_new.show()
      

      然后我们可以看到模型被加载了 16 次,因此我们处理的每个条目都会加载一个。不是我想要的。

      批处理

      使用 spark 3.0+ 中引入的装饰器进行重写,该装饰器使用类型提示 (python 3.6+)。然后我们的 UDF 使用 nlp.pipe() 对整个 pd.Series 进行批处理

      # printing a msg each time we load the model
      def load_spacy_model():
          print("Loading spacy model...")
          return spacy.load("en_core_web_sm")
      
      # decorator indicating that this function is pandas_udf
      # and that it's gonna process list of string
      @pandas_udf(ArrayType(StringType()))
      # function receiving a pd.Series and returning a pd.Series
      def entities(list_of_text: pd.Series) -> pd.Series:
          global nlp
          nlp = load_spacy_model()
          docs = nlp.pipe(list_of_text)
      
          # retrieving the str representation of entity label
          # as we are limited in the types of obj
          # we can return from a panda_udf
          # we couldn't return a Span obj for example
          ents=[
              [ent.label_ for ent in doc.ents]
              for doc in docs
          ]
          return pd.Series(ents)
      
      
      df_new = df.withColumn('entity',entities('value'))
      
      df_new.show()
      

      在我的例子中,模型被加载了 4 次,这样更好。每次创建一个 python worker 来处理一个批处理。所以这个数字将取决于 Spark 使用了多少个内核,但在我的情况下更关键的是:我们的数据有多少分区。所以它还没有达到最佳状态

      广播nlp 对象

      # printing a msg each time we load the model
      def load_spacy_model():
          print("Loading spacy model...")
          return spacy.load("en_core_web_sm")
      
      @pandas_udf(ArrayType(StringType()))
      def entities(list_of_text: pd.Series) -> pd.Series:
          nlp = boardcasted_nlp.value
          docs = nlp.pipe(list_of_text)
      
          # retrieving the str representation of entity label
          # as we are limited in the types of obj
          # we can return from a panda_udf
          # we couldn't return a Span obj for example
          ents=[
              [ent.label_ for ent in doc.ents]
              for doc in docs
          ]
          return pd.Series(ents)
      
      boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
      
      df_new = df.withColumn('entity',entities('value'))
      
      df_new.show()
      

      现在模型只加载一次,然后广播给每个正在生成的 python 工作者。

      完整代码

      import pandas as pd
      
      from pyspark.sql import SparkSession
      from pyspark.sql import functions as F
      from pyspark.sql.types import StringType, ArrayType
      from pyspark.sql.functions import pandas_udf, PandasUDFType
      
      from faker import Faker
      import spacy
      
      spark = SparkSession.builder.appName("pyspark_sandbox").getOrCreate()
      
      # creating our set of fake person and company names
      names = []
      fake = Faker()
      for _ in range(8):
          names.append(f"{fake.company()} {fake.company_suffix()}")
          names.append(fake.name())
      
      df = spark.createDataFrame(names, StringType())
      
      # printing a msg each time we load the model
      def load_spacy_model():
          print("Loading spacy model...")
          return spacy.load("en_core_web_sm")
      
      # decorator indicating that this function is pandas_udf
      # and that it's gonna process list of string
      @pandas_udf(ArrayType(StringType()))
      # function receiving a pd.Series and returning a pd.Series
      def entities(list_of_text: pd.Series) -> pd.Series:
          # retrieving the shared nlp object
          nlp = boardcasted_nlp.value
          # batch processing our list of text
          docs = nlp.pipe(list_of_text)
          
          # retrieving the str representation of entity label
          # as we are limited in the types of obj
          # we can return from a panda_udf
          # we couldn't return a Span obj for example
          ents=[
              [ent.label_ for ent in doc.ents]
              for doc in docs
          ]
          return pd.Series(ents)
      
      # we load the spacy model and broadcast it
      boardcasted_nlp = spark.sparkContext.broadcast(load_spacy_model())
      
      df_new = df.withColumn('entity',entities('value'))
      
      df_new.show(truncate=False)
      
      

      结果

      +----------------------------------+--------------------------------+
      |value                             |entity                          |
      +----------------------------------+--------------------------------+
      |Ferguson, Price and Green Ltd     |[ORG, ORG, ORG]                 |
      |Cassandra Goodman MD              |[PERSON]                        |
      |Solis Ltd LLC                     |[ORG]                           |
      |Laurie Foster                     |[PERSON]                        |
      |Lane-Vasquez Group                |[ORG]                           |
      |Matthew Wright                    |[PERSON]                        |
      |Scott, Pugh and Rodriguez and Sons|[PERSON, PERSON, PERSON, PERSON]|
      |Tina Cooke                        |[PERSON]                        |
      |Watkins, Blake and Foster Ltd     |[ORG]                           |
      |Charles Reyes                     |[PERSON]                        |
      |Cooper, Norris and Roberts PLC    |[ORG]                           |
      |Michael Tate                      |[PERSON]                        |
      |Powell, Lawson and Perez and Sons |[PERSON, PERSON, PERSON, PERSON]|
      |James Wolf PhD                    |[PERSON]                        |
      |Greer-Swanson PLC                 |[ORG]                           |
      |Nicholas Hale                     |[PERSON]                        |
      +----------------------------------+--------------------------------+
      

      【讨论】:

      • 简洁的解决方案。谢谢!
      猜你喜欢
      • 2017-12-17
      • 1970-01-01
      • 2020-07-04
      • 2021-03-27
      • 1970-01-01
      • 2017-04-12
      • 1970-01-01
      • 2021-09-24
      • 2021-08-13
      相关资源
      最近更新 更多