【问题标题】:Spark UDF not running in parallelSpark UDF 未并行运行
【发布时间】:2018-07-02 13:40:28
【问题描述】:

我正在尝试使用 Google phonenumbers 库的 Python 端口来规范化 5000 万个电话号码。我正在从 S3 上的 Parquet 文件中读取 SparkDataFrame,然后在数据帧上运行操作。以下函数 parsePhoneNumber 表示为 UDF:

def isValidNumber(phoneNum):
    try:
        pn = phonenumbers.parse(phoneNum, "US")
    except:
        return False
    else:
        return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)

def parsePhoneNumber(phoneNum):
    if isValidNumber(phoneNum):
        parsedNumber = phonenumbers.parse(phoneNum, "US")
        formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)

        return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
    else:
        return (False, None, None, None)

下面是我如何使用 UDF 派生新列的示例:

newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)

通过运行display(newDataFrame)newDataFrame.show(5) 或类似的东西来执行UDF 只使用集群中的一个执行器,因此UDF 中的某些东西似乎不会导致它只在一个worker 上运行。

如果我正在做任何会阻止它并行运行的事情,您能否提供一些见解?

执行环境位于由 Databricks 控制的云集群上。

编辑:下面是oldDataFrame.explain的输出

== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet

== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...

【问题讨论】:

  • 如何创建 SparkSession?
  • 我认为没有必要在 Databricks 上构建 SparkSession;因此,我直接使用sqlContext
  • 好的,但是您的 SparkSession 在哪种模式下运行?可能是local,那么它会在一个节点上运行
  • 请运行sc.master,其中 sc 是 SparkContext
  • 运行 sc.master 返回一个 URL。

标签: python apache-spark pyspark databricks


【解决方案1】:

你们都很好。 Display,默认参数最多显示前 1000 行。同样newDataFrame.show(5) 只显示前五行。

同时执行普通 (oldDataFrame.explain) 显示没有随机播放,因此在这两种情况下,Spark 将仅评估最小数量的分区以获得所需的行数 - 对于这些值,它可能是一个分区。

如果你想确定:

  • 检查oldDataFrame.rdd.getNumPartitions() 是否大于一。
  • 如果是,则使用df.foreach(lambda _: None)newDataFrame.foreach(lambda _: None) 强制执行所有分区。

您应该会看到更多活跃的执行者。

【讨论】:

  • show() 使用普通的limit - 它应该先进行局部限制,然后再进行全局限制。也许 Parquet 文件只有一个分区?
  • 在将 CSV 文件转换为 Parquet 时,我一直将它们划分为 25 个,因此 oldDataFrame.rdd.getNumPartitions() 的输出为 25。此外,运行 oldDataFrame.foreach(lambda _: None) 会在 4 个工作人员上执行 4 个任务。到那时,在这里使用withColumn 是合适的策略吗?我假设简单地映射 DataFrame 也会并行执行。
  • @sean Spark 变得越来越复杂,也许他们对此进行了优化。所以,投票值得;)
  • @T.Gawęda 据我所知,show(n) 将评估为 df.limit(n).queryExecution...,并且通过窄转换(无随机播放)它只会收集所需的数量。
  • @T.Gawęda 拥有优化器最糟糕的事情是你永远不知道那里到底发生了什么。现在我们有了基于成本的 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-01-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-05
相关资源
最近更新 更多