【问题标题】:Azure Databricks not paralelizing on SparkAzure Databricks 未在 Spark 上并行化
【发布时间】:2021-11-28 10:01:27
【问题描述】:

我正在使用 Spark 运行一个脚本,该脚本在我的计算机和使用所有并行可用内核(约 6000 个进程)的 Google VM 上都可以正常工作。 但是,当我尝试在 Azure Databricks 上运行它时,使用最少 2 个工作人员和最多 25 个工作人员的集群,每个工作人员有 4 个内核并运行 DB 9.0,它只是一个接一个地运行而没有并行化。

除了在任何 VM 中的常规设置之外,是否需要在 Databricks 中运行 Spark 的任何额外设置?

这是我用来调试并行化问题的测试脚本(集群负载

import pandas as pd
import os
import numpy as np
import datetime

from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, StructType,StructField,IntegerType,FloatType, StringType


def main(df_estructural):
    sku = df_estructural.Material.iloc[0]
    
    df = pd.read_csv(os.path.join(DATA, 'Sales', str(sku)+'.csv'), header=0, thousands=',', decimal='.', encoding = 'latin-1')
    local_unique = len(df.Local.unique())

    return pd.DataFrame(np.array([[str(FIN.value), str(sku), str(local_unique )]]), columns=('Date', 'Material', 'Local'))


if __name__ == '__main__':  
    FIN = '2021-09-27'

    DATA = '/dbfs/mnt/simulaciones/data'
    DATA_RESULTADOS = '/dbfs/mnt/simulaciones/Resultados'

    df = pd.read_csv(os.path.join(DATA, 'list.csv'), sep=',', header=0, thousands='.', decimal=',', encoding = 'latin-1')
    df = df_OTB[['Material', 'Alpha']]

    spark = SparkSession \
        .builder \
        .appName("test") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .getOrCreate()

    FIN = spark.sparkContext.broadcast(FIN)

    schema = StructType([StructField('Date', StringType(), True),
                        StructField('Material', StringType(), True),
                        StructField('Local', StringType(), True)])

    df_spark = spark.createDataFrame(df) 

    df_estructural = df_spark \
                .groupby(['Material']) \
                .applyInPandas(main, schema=schema) \
                .collect()

    spark.createDataFrame(df_estructural).toPandas().to_csv(os.path.join(DATA_RESULTADOS,'test.csv')), index=False)
  

【问题讨论】:

  • @Kafels 没关系,并发进程的数量应该等于分组数据框中的组数。
  • 如果您真的想在代码中处理并行化,我们可以使用 ThreadPoolExecutor 概念。这样您就可以通过完全控制轻松处理并行性。docs.python.org/3/library/concurrent.futures.html

标签: python azure apache-spark databricks


【解决方案1】:

试试converting Pandas to Koalas,它应该比你当前在驱动程序节点上运行的 Pandas 更好地在你的 Databricks 节点上并行化。

【讨论】:

  • 感谢您的提醒,尝试过但收到以下错误:“从 UDF 引发异常:'异常:SparkContext 只能在驱动程序上创建和访问。”
【解决方案2】:

终于找到问题了,贴在这里,以防其他人遇到类似情况。

鉴于 pandas 数据框只有 6000 行,Databricks 并没有对其进行并行化(也许它认为它不够大)。因此,我通过添加重新分区的数量来强制它:

   df_estructural = df_spark \
                .repartition(104) \
                .groupby(['Material']) \
                .applyInPandas(main, schema=schema) \
                .collect()

【讨论】:

    猜你喜欢
    • 2023-01-23
    • 2022-07-06
    • 1970-01-01
    • 2020-01-13
    • 2020-02-29
    • 1970-01-01
    • 1970-01-01
    • 2019-03-08
    • 1970-01-01
    相关资源
    最近更新 更多