【发布时间】:2018-07-02 13:40:28
【问题描述】:
我正在尝试使用 Google phonenumbers 库的 Python 端口来规范化 5000 万个电话号码。我正在从 S3 上的 Parquet 文件中读取 SparkDataFrame,然后在数据帧上运行操作。以下函数 parsePhoneNumber 表示为 UDF:
def isValidNumber(phoneNum):
try:
pn = phonenumbers.parse(phoneNum, "US")
except:
return False
else:
return phonenumbers.is_valid_number(pn) and phonenumbers.is_possible_number(pn)
def parsePhoneNumber(phoneNum):
if isValidNumber(phoneNum):
parsedNumber = phonenumbers.parse(phoneNum, "US")
formattedNumber = phonenumbers.format_number(parsedNumber, phonenumbers.PhoneNumberFormat.E164)
return (True, parsedNumber.country_code, formattedNumber, parsedNumber.national_number, parsedNumber.extension)
else:
return (False, None, None, None)
下面是我如何使用 UDF 派生新列的示例:
newDataFrame = oldDataFrame.withColumn("new_column", parsePhoneNumber_udf(oldDataFrame.phone)).select("id", "new_column".national_number)
通过运行display(newDataFrame) 或newDataFrame.show(5) 或类似的东西来执行UDF 只使用集群中的一个执行器,因此UDF 中的某些东西似乎不会导致它只在一个worker 上运行。
如果我正在做任何会阻止它并行运行的事情,您能否提供一些见解?
执行环境位于由 Databricks 控制的云集群上。
编辑:下面是oldDataFrame.explain的输出
== Parsed Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Analyzed Logical Plan ==
id: string, person_id: string, phone: string, type: string, source_id: string, created_date: string, modified_date: string
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Optimized Logical Plan ==
Relation[id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] parquet
== Physical Plan ==
*FileScan parquet [id#621,person_id#622,phone#623,type#624,source_id#625,created_date#626,modified_date#627] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/mnt/person-data/parquet/phone], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,person_id:string,phone:string,type:string,source_id:string,created_date:strin...
【问题讨论】:
-
如何创建 SparkSession?
-
我认为没有必要在 Databricks 上构建 SparkSession;因此,我直接使用
sqlContext。 -
好的,但是您的 SparkSession 在哪种模式下运行?可能是
local,那么它会在一个节点上运行 -
请运行
sc.master,其中 sc 是 SparkContext -
运行
sc.master返回一个 URL。
标签: python apache-spark pyspark databricks