【问题标题】:Multi-processing in Azure DatabricksAzure Databricks 中的多处理
【发布时间】:2022-03-01 20:19:19
【问题描述】:

我最近接到任务,将 JSON 响应提取到 Databricks Delta-lake。我必须使用不同的参数访问 REST API 端点 URL 6500 次并拉取响应。

我尝试了多处理库中的两个模块,ThreadPool 和 Pool,以使每次执行速度更快。

线程池:

  1. 当 Azure Databricks 集群设置为从 2 到 13 个工作节点自动缩放时,如何选择 ThreadPool 的线程数?

现在,我设置了 n_pool = multiprocessing.cpu_count(),如果集群自动扩展,会有什么不同吗?

  1. 当我使用 Pool 来使用处理器而不是线程时。我在每次执行时随机看到以下错误。好吧,我从缺少 Spark Session/Conf 的错误中了解到,我需要从每个进程中设置它。但是我在启用了默认 Spark 会话的 Databricks 上,那么为什么我会看到这些错误。
Py4JError: SparkConf does not exist in the JVM 
**OR** 
py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
  1. 最后,计划用“concurrent.futures.ProcessPoolExecutor”替换多处理。有什么区别吗?

【问题讨论】:

    标签: python azure azure-databricks


    【解决方案1】:

    如果您使用线程池,它们将仅在驱动程序节点上运行,执行程序将处于空闲状态。相反,您需要使用 Spark 本身来并行化请求。这通常是通过创建一个包含 URL 列表的数据框(如果基本 URL 相同,则为 URL 的参数),然后使用 Spark user defined function 执行实际请求。像这样的:

    import urllib
    
    df = spark.createDataFrame([("url1", "params1"), ("url2", "params2")], 
                               ("url", "params"))
    
    @udf("body string, status int")
    def do_request(url: str, params: str):
      full_url = url + "?" + params # adjust this as required
      with urllib.request.urlopen(full_url) as f:
        status = f.status
        body = f.read().decode("utf-8")
      
      return {'status': status, 'body': body}
      
    
    res = df.withColumn("result", do_requests(col("url"), col("params")))
    

    这将返回带有一个名为 result 的新列的数据框,该列将包含两个字段 - statusbody(JSON 答案为字符串)。

    【讨论】:

      【解决方案2】:

      您可以尝试以下方法解决

      Py4JError: SparkConf does not exist in the JVM
      **OR** 
      py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM
      

      错误

      Install findspark 
      $pip install findspark
      Code:
      import findsparkfindspark.init()
      

      参考:Py4JError: SparkConf does not exist in the JVMpy4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getEncryptionEnabled does not exist in the JVM

      【讨论】:

        猜你喜欢
        • 2022-07-04
        • 1970-01-01
        • 2021-06-14
        • 1970-01-01
        • 2021-08-17
        • 2021-01-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多