【问题标题】:How to submit a PyDeequ job from Jupyter Notebook to a Spark/YARN如何从 Jupyter Notebook 提交 PyDeequ 作业到 Spark/YARN
【发布时间】:2021-08-16 01:26:14
【问题描述】:

如何配置环境以将 PyDeequ 作业从 Jupyter 笔记本提交到 Spark/YARN(客户端模式)。除了使用环境之外,没有全面的解释。如何设置环境以与非 AWS 环境一起使用?

如果只是按照示例进行操作,则会导致诸如TypeError: 'JavaPackage' object is not callable 的错误,例如Testing data quality at scale with PyDeequ.

from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
      1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
      3     .onData(df) \
      4     .addAnalyzer(Size()) \
      5     .addAnalyzer(Completeness("review_id")) \

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
     50         """
     51         df = ensure_pyspark_df(self._spark_session, df)
---> 52         return AnalysisRunBuilder(self._spark_session, df)
     53 
     54 

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
    122         self._jspark_session = spark_session._jsparkSession
    123         self._df = df
--> 124         self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
    125 
    126     def addAnalyzer(self, analyzer: _AnalyzerObject):

TypeError: 'JavaPackage' object is not callable

【问题讨论】:

    标签: python amazon-deequ


    【解决方案1】:

    HADOOP_CONF_DIR

    $HADOOP_HOME/etc/hadoop的内容从Hadoop/YARN主节点复制到本地主机,并设置HADOOP_CONF_DIR环境变量指向该目录。

    确保 HADOOP_CONF_DIRYARN_CONF_DIR 指向包含 Hadoop 集群(客户端)配置文件的目录。这些配置用于写入 HDFS 并连接到 YARN ResourceManager。此目录中包含的配置将分发到 YARN 集群,以便应用程序使用的所有容器使用相同的配置。

    os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"
    

    PYTHONPATH

    pyspark

    需要能够加载 pyspark python 模块。 pyspark 使用 pip 或 conda 安装 Spark 运行时库(用于独立)。或者从 Spark 安装中复制 pyspark python 模块$SPARK_HOME/python/lib

    确保 SPARK_HOME 环境变量指向解压 tar 文件的目录。更新 PYTHONPATH 环境变量,使其可以在 SPARK_HOME/python/lib 下找到 PySpark 和 Py4J。这样做的一个例子如下所示:

    export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
    
    sys.path.extend([
        "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
        "/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
    ])
    

    PyDeequ

    使用 pip 或 conda 安装 pydeequ。请注意,使用 pydeequ 是不够的。

    Deequ JAR 文件

    将 jar 放到库路径

    要使用 PyDeequ,需要 deequ jar 文件。从Maven repository com.amazon.deequ 下载 Spark/Deequ 版本。

    import os
    import sys
    
    root = os.path.dirname(os.path.realpath(os.getcwd()))
    deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
    classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"
    
    !wget -q -O $classpath $deequ_jar
    

    Spark 会话

    将 Deequ jar 文件指定到 Spark jar 属性中,如下所示:

    spark = SparkSession.builder\
        .master('yarn') \
        .config('spark.submit.deployMode', 'client') \
        .config("spark.driver.extraClassPath", classpath) \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .config('spark.debug.maxToStringFields', 100) \
        .config('spark.executor.memory', '2g') \
        .getOrCreate()
    

    Deequ 作业

    使用亚马逊产品评论数据的摘录。

    df = spark.read.csv(
        path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
        header=True,
    )
    df.printSchema()
    -----
    root
     |-- review_id: string (nullable = true)
     |-- marketplace: string (nullable = true)
     |-- product_id: string (nullable = true)
     |-- year: string (nullable = true)
     |-- star_rating: string (nullable = true)
     |-- total_votes: string (nullable = true)
     |-- helpful_votes: string (nullable = true)
     |-- product_category: string (nullable = true)
    
    from pydeequ.analyzers import *
    analysisResult = AnalysisRunner(spark) \
        .onData(df) \
        .addAnalyzer(Size()) \
        .addAnalyzer(Completeness("review_id")) \
        .addAnalyzer(ApproxCountDistinct("review_id")) \
        .addAnalyzer(Mean("star_rating")) \
        .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
        .addAnalyzer(Correlation("total_votes", "star_rating")) \
        .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
        .run()
                        
    analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
    analysisResult_df.show()
    -----
    21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                    
    +-------+---------------+-------------------+------+
    | entity|       instance|               name| value|
    +-------+---------------+-------------------+------+
    | Column|      review_id|       Completeness|   1.0|
    | Column|      review_id|ApproxCountDistinct|1040.0|
    |Dataset|              *|               Size|1000.0|
    | Column|top star_rating|         Compliance| 0.657|
    +-------+---------------+-------------------+------+
    

    【讨论】:

      猜你喜欢
      • 2015-09-25
      • 1970-01-01
      • 1970-01-01
      • 2015-08-20
      • 2023-04-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-22
      相关资源
      最近更新 更多