【发布时间】:2021-11-28 10:01:27
【问题描述】:
我正在使用 Spark 运行一个脚本,该脚本在我的计算机和使用所有并行可用内核(约 6000 个进程)的 Google VM 上都可以正常工作。 但是,当我尝试在 Azure Databricks 上运行它时,使用最少 2 个工作人员和最多 25 个工作人员的集群,每个工作人员有 4 个内核并运行 DB 9.0,它只是一个接一个地运行而没有并行化。
除了在任何 VM 中的常规设置之外,是否需要在 Databricks 中运行 Spark 的任何额外设置?
这是我用来调试并行化问题的测试脚本(集群负载
import pandas as pd
import os
import numpy as np
import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import BooleanType, StructType,StructField,IntegerType,FloatType, StringType
def main(df_estructural):
sku = df_estructural.Material.iloc[0]
df = pd.read_csv(os.path.join(DATA, 'Sales', str(sku)+'.csv'), header=0, thousands=',', decimal='.', encoding = 'latin-1')
local_unique = len(df.Local.unique())
return pd.DataFrame(np.array([[str(FIN.value), str(sku), str(local_unique )]]), columns=('Date', 'Material', 'Local'))
if __name__ == '__main__':
FIN = '2021-09-27'
DATA = '/dbfs/mnt/simulaciones/data'
DATA_RESULTADOS = '/dbfs/mnt/simulaciones/Resultados'
df = pd.read_csv(os.path.join(DATA, 'list.csv'), sep=',', header=0, thousands='.', decimal=',', encoding = 'latin-1')
df = df_OTB[['Material', 'Alpha']]
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
FIN = spark.sparkContext.broadcast(FIN)
schema = StructType([StructField('Date', StringType(), True),
StructField('Material', StringType(), True),
StructField('Local', StringType(), True)])
df_spark = spark.createDataFrame(df)
df_estructural = df_spark \
.groupby(['Material']) \
.applyInPandas(main, schema=schema) \
.collect()
spark.createDataFrame(df_estructural).toPandas().to_csv(os.path.join(DATA_RESULTADOS,'test.csv')), index=False)
【问题讨论】:
-
@Kafels 没关系,并发进程的数量应该等于分组数据框中的组数。
-
如果您真的想在代码中处理并行化,我们可以使用 ThreadPoolExecutor 概念。这样您就可以通过完全控制轻松处理并行性。docs.python.org/3/library/concurrent.futures.html
标签: python azure apache-spark databricks