【问题标题】:PySpark Environment Setup for Pandas UDFPandas UDF 的 PySpark 环境设置
【发布时间】:2021-09-24 18:03:21
【问题描述】:

-编辑-

这个简单的示例只显示了 3 条记录,但我需要对数十亿条记录执行此操作,因此我需要使用 Pandas UDF,而不是仅仅将 Spark DF 转换为 Pandas DF 并使用简单的应用。

输入数据

期望的输出

-结束编辑-

我一直在努力解决这个问题,我希望有人可以帮助我解决这个问题。我正在尝试将 PySpark 数据帧中的纬度/经度值转换为 Uber 的 H3 十六进制系统。这是函数h3.geo_to_h3(lat=lat, lng=lon, resolution=7) 的一个非常简单的用法。但是,我的 PySpark 集群一直存在问题。

我正在按照 databricks 文章 here 中的描述设置我的 PySpark 集群,使用以下命令:

  1. conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas h3 numpy python=3.7 conda-pack
  2. conda init --all 然后关闭并重新打开终端窗口
  3. conda activate pyspark_conda_env
  4. conda pack -f -o pyspark_conda_env.tar.gz

我在 jupyter 笔记本中包含创建 spark 集群时创建的 tar.gz 文件,就像 spark = SparkSession.builder.master("yarn").appName("test").config("spark.yarn.dist.archives","<path>/pyspark_conda_env.tar.gz#environment").getOrCreate()

我的 pandas udf 设置如下

#create udf to convert lat lon to h3 hex
def convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
    import h3 as h3
    import numpy as np
    if ((None in [lat, lon]) | (np.isnan(lat))):
        return None
    else:
        return (h3.geo_to_h3(lat=lat, lng=lon, resolution=7))

@f.pandas_udf('string', f.PandasUDFType.SCALAR)
def udf_convert_to_h3(lat : pd.Series, lon : pd.Series) -> pd.Series:
    import pandas as pd
    import numpy as np
    df = pd.DataFrame({'lat' : lat, 'lon' : lon})
    df['h3_res7'] = df.apply(lambda x : convert_to_h3(x['lat'], x['lon']), axis = 1)
    return df['h3_res7']

使用 pandas udf 创建新列并尝试查看后:

trip_starts = trip_starts.withColumn('h3_res7', udf_convert_to_h3(f.col('latitude'), f.col('longitude')))

我收到以下错误:

21/07/15 20:05:22 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Requesting driver to remove executor 139 for reason Container marked as failed: container_1626376534301_0015_01_000158 on host: ip-xx-xxx-xx-xxx.aws.com. Exit status: -100. Diagnostics: Container released on a *lost* node.

我不知道该怎么做,因为我已尝试将记录数量减少到更易于管理的数量,但仍然遇到此问题。理想情况下,我想弄清楚如何使用我链接的 databricks 博客文章中描述的 PySpark 环境,而不是在启动集群时运行引导脚本,因为公司政策使引导脚本更难以运行。

【问题讨论】:

  • 简单点,给出样本数据和预期输出
  • 感谢您的建议,我会尽快编辑原帖!
  • 根据您的要求@wwnde,我在这里添加了一个简单的示例。谢谢!
  • @wwnde,对此有何想法?
  • 请看这个答案,也许这对你有帮助。 stackoverflow.com/questions/67869938/…

标签: python amazon-web-services dataframe pyspark bigdata


【解决方案1】:

我最终通过将我的数据重新分区到更小的分区中来解决这个问题,每个分区中的记录更少。这解决了我的问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-05-16
    • 1970-01-01
    • 2021-08-13
    • 2021-07-06
    • 2022-01-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多