【问题标题】:Improving performance of PySpark with Dataframes and SQL使用 Dataframes 和 SQL 提高 PySpark 的性能
【发布时间】:2019-01-15 03:02:57
【问题描述】:

我目前正在读取包含数百个 CSV 文件的公共 AWS 存储桶中的所有文本文件。我一次读取所有 CSV 文件,然后将它们转换为 RDD 并开始处理数据,以便可以将其存储在 Cassandra 中。处理所有文本文件需要两个半小时,这对于仅 100GB 的数据来说太长了。我可以对下面的代码做些什么来让它更快吗?

感谢任何建议。我也试过阅读这个https://robertovitillo.com/2015/06/30/spark-best-practices/,但我对如何实现一些提到的东西感到困惑,比如“使用正确的并行度”。我还尝试通过执行 rdd.cache 将我的 RDD 存储在缓存中,但这仍然需要两个多小时。

conf = SparkConf() \
   .setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")

def rddCleaning(rd,timeframe):

 def check_valid(tup):
    try:
        int(tup[1])
        int(tup[4])
        float(tup[5])
        float(tup[6])
        return True
    except ValueError:
        return False


 def fillin(tup):
    if tup[2] == "" and tup[3] != "":
        return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
    else:
        return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))

 def popular_avg(curr_tup):
    lst = curr_tup[1]
    dictionary = curr_tup[2]
    dict_matches = {}
    for tup in lst:
    event_type = tup[0]
        dict_matches[event_type] = dictionary[event_type]
    return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

 def merge_info(tup):
    main_dict = tup[1]
    info_dict = tup[2]
    for key in info_dict:
        main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
    return ((tup[0],main_dict))

 def event_todict(tup):
    lst = tup[1]
    dict_matches = {}
    for event_tup in lst:
        dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
    return ((tup[0],dict_matches,tup[2],tup[3]))

 def sum_allevents(tup):
    type_lst = tup[1]
    total_mentions = 0
    for event in type_lst:
            total_mentions += event[1]
    return ((tup[0],type_lst,tup[2],total_mentions))

actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34

if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3




rdd_reduce  = rd.map(lambda x: x.split('\t')) \
        .map(lambda y: ((y[actionGeo_CountryCode],
                                 y[time],
                                 y[actor1Type1Code],
                                 y[actor2Type1Code],
                                 y[numArticles],
                                 y[goldsteinScale],
                                 y[avgTone]))) \
        .filter(check_valid) \
        .map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
        .map(fillin) \
                .filter(lambda r: r[0] in tofullname and r[2] in toevent and  r[2] != "" and r[0] != "") \
                .map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
                .map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
        .map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))


rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
                                      ([(t[0][2],t[1][0])],
                                      [(t[0][2],{"GoldsteinScaleAvg":t[1][1],
                                                "ToneAvg":t[1][2]})]))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
           .map(lambda v: (v[0],
                                       sorted(v[1][0],key=itemgetter(1),reverse=True),
                                       v[1][1])) \
               .map(sum_allevents) \
                       .map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
                       .map(popular_avg) \
               .map(event_todict) \
                       .map(merge_info) \
           .map(lambda d: ((d[0][0],d[0][1],d[1])))

return rdd_format




daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));

这是我运行的 pyspark 的图片:

根据建议进行的修改: 我对我的代码进行了以下更改,它提高了性能,但仍然需要很长时间。发生这种情况是因为每次我调用 df 它都会从我的 S3 存储桶中重新读取所有文件吗?我应该把我的一些 df 和临时表放在缓存中吗? 这是我的代码:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra

sc = SparkContext()
sqlContext = SQLContext(sc)

customSchema = schema = StructType([
        StructField('GLOBALEVENTID',StringType(),True),
        StructField('SQLDATE',StringType(),True),
        StructField('MonthYear',StringType(),True),
        StructField('Year',StringType(),True),
        StructField('FractionDate',StringType(),True),
        StructField('Actor1Code',StringType(),True),
        StructField('Actor1Name',StringType(),True),
        StructField('Actor1CountryCode',StringType(),True),
        StructField('Actor1KnownGroupCode',StringType(),True),
        StructField('Actor1EthnicCode',StringType(),True),
        StructField('Actor1Religion1Code',StringType(),True),
        StructField('Actor1Religion2Code',StringType(),True),
        StructField('Actor1Type1Code',StringType(),True),
        StructField('Actor1Type2Code',StringType(),True),
        StructField('Actor1Type3Code',StringType(),True),
        StructField('Actor2Code',StringType(),True),
        StructField('Actor2Name',StringType(),True),
        StructField('Actor2CountryCode',StringType(),True),
        StructField('Actor2KnownGroupCode',StringType(),True),
        StructField('Actor2EthnicCode',StringType(),True),
        StructField('Actor2Religion1Code',StringType(),True),
        StructField('Actor2Religion2Code',StringType(),True),
        StructField('Actor2Type1Code',StringType(),True),
        StructField('Actor2Type2Code',StringType(),True),
        StructField('Actor2Type3Code',StringType(),True),
        StructField('IsRootEvent',StringType(),True),
        StructField('EventCode',StringType(),True),
        StructField('EventBaseCode',StringType(),True),
        StructField('EventRootCode',StringType(),True),
        StructField('QuadClass',StringType(),True),
        StructField('GoldsteinScale',StringType(),True),
        StructField('NumMentions',StringType(),True),
        StructField('NumSources',StringType(),True),
        StructField('NumArticles',StringType(),True),
        StructField('AvgTone',StringType(),True),
        StructField('Actor1Geo_Type',StringType(),True),
        StructField('Actor1Geo_FullName',StringType(),True),
        StructField('Actor1Geo_CountryCode',StringType(),True),
        StructField('Actor1Geo_ADM1Code',StringType(),True),
        StructField('Actor1Geo_Lat',StringType(),True),
        StructField('Actor1Geo_Long',StringType(),True),
        StructField('Actor1Geo_FeatureID',StringType(),True),
        StructField('Actor2Geo_Type',StringType(),True),
        StructField('Actor2Geo_FullName',StringType(),True),
        StructField('Actor2Geo_CountryCode',StringType(),True),
        StructField('Actor2Geo_ADM1Code',StringType(),True),
        StructField('Actor2Geo_Lat',StringType(),True),
        StructField('Actor2Geo_Long',StringType(),True),
        StructField('Actor2Geo_FeatureID',StringType(),True),
        StructField('ActionGeo_Type',StringType(),True),
        StructField('ActionGeo_FullName',StringType(),True),
        StructField('ActionGeo_CountryCode',StringType(),True),
        StructField('ActionGeo_ADM1Code',StringType(),True),
        StructField('ActionGeo_Lat',StringType(),True),
        StructField('ActionGeo_Long',StringType(),True),
        StructField('ActionGeo_FeatureID',StringType(),True),
        StructField('DATEADDED',StringType(),True),
        StructField('SOURCEURL',StringType(),True)])

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false') \
    .options(delimiter="\t") \
    .load('s3a://gdelt-open-data/events/*', schema = customSchema)

def modify_values(r,y):
   if r == '' and y != '':
    return y
   else:
        return r

def country_exists(r):
   if r in tofullname:
        return tofullname[r]
   else:
    return ''

def event_exists(r):
   if r in toevent:
    return toevent[r]
   else:
    return ''


modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
           .withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
           .withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))

sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               SQLDATE, MonthYear, Year,
                               Actor1Type1Code,
                               NumArticles,
                               GoldsteinScale,
                               AvgTone
                          FROM temp
                         WHERE ActionGeo_CountryCode <> ''
                            AND Actor1Type1Code <> ''
                            AND NumArticles <> ''
                            AND GoldsteinScale <> ''
                            AND AvgTone <> ''""")

sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
                               Actor1Type1Code,
                               CAST(NumArticles AS INTEGER),
                               CAST(GoldsteinScale AS INTEGER),
                               CAST(AvgTone AS INTEGER)
                          FROM temp2""")

sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')

dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                   SQLDATE,
                                   Actor1Type1Code,
                                   SUM(NumArticles) AS NumArticles,
                                   ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                   ROUND(AVG(AvgTone),0) AS AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                      SQLDATE,
                                      Actor1Type1Code""")

dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                     MonthYear,
                                     Actor1Type1Code,
                                     SUM(NumArticles) AS NumArticles,
                                     ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                     ROUND(AVG(AvgTone),0) as AvgTone
                                FROM temp3
                                   GROUP BY ActionGeo_CountryCode,
                                    MonthYear,
                                    Actor1Type1Code""")

dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                    Year,
                                    Actor1Type1Code,
                                    SUM(NumArticles) AS NumArticles,
                                    ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                    ROUND(AVG(AvgTone),0) as AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                       Year,
                                       Actor1Type1Code""")

def rddCleaning(rd,timeframe):

    def popular_avg(curr_tup):
        lst = curr_tup[1]
        dictionary = curr_tup[2]
        dict_matches = {}
        for tup in lst:
        event_type = tup[0]
            dict_matches[event_type] = dictionary[event_type]
        return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

    def merge_info(tup):
        main_dict = tup[1]
        info_dict = tup[2]
        for key in info_dict:
            main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
        return ((tup[0],main_dict))

    def event_todict(tup):
        lst = tup[1]
        dict_matches = {}
        for event_tup in lst:
            dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
        return ((tup[0],dict_matches,tup[2],tup[3]))

    def sum_allevents(tup):
        type_lst = tup[1]
        total_mentions = 0
        for event in type_lst:
                total_mentions += event[1]
        return ((tup[0],type_lst,tup[2],total_mentions))

    rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
                                   ([(y["Actor1Type1Code"],y["NumArticles"])],
                    [(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
                   ))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
               .map(lambda v: (v[0],
                                   sorted(v[1][0],key=itemgetter(1),reverse=True),
                                   dict(v[1][1]))) \
           .map(sum_allevents) \
                   .map(popular_avg) \
           .map(event_todict) \
                   .map(merge_info) \
               .map(lambda d: ((d[0][0],d[0][1],d[1])))

    return rdd_format

print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))

【问题讨论】:

  • 对于初学者来说,你会丢失所有的 udf。这些都不是必需的。
  • 感谢您的回复!对此,我真的非常感激。我使用 ufds 是因为我不确定如何在 SQL 中使用 python 字典。在两个 udf 中,我根据存储在另一个 python 文件中的字典重命名行。例如,如果列中的单元格有“US”,那么我在 udf 中将其重命名为“United States”。我可以在 SQL 中使用字典变量吗?
  • 您可以将joinbroadcastliteral map 一起使用。
  • @zero323 我会按照您的建议进行更改。您认为还有什么其他原因导致我的 spark 作业需要很长时间才能运行吗?我可以进行其他配置以使我的工作运行得更快吗?我正在使用一个主节点和三个从节点,处理 100GB 的数据需要三个多小时。
  • pyspark.sql.functions.create_map

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

我能想到的最直接的事情是使用数据帧而不是 RDD。 基本上,由于 python 和 JVM 之间的转换,python 中的 RDD 比 scala 慢得多。数据帧也有很多优化。

很难按照这里的所有代码来尝试建议转换,但是,作为基础,您可以使用 spark.read.csv 从 csv 直接读取数据帧(并设置一个架构,以便很多验证将自动发生),并且许多现有的函数应该使它易于编写。

【讨论】:

  • 如果我将 csv 作为数据框读取,那么我不能再执行 .map 和 .reduce 函数了,对吧?我需要 .map 和 .reduce 对我的数据进行多次转换。我正在使用一个主人和三个奴隶,我想利用我所有的资源。如果我使用 RDD,那么它会将任务并行化到其他节点。
  • @CatherineAlv 如果你有一些细节,假设你在 RDD 和 DataFrames 之间来回走动
  • 快速浏览一下您的代码似乎表明您在 .map 和 .reduce 中所做的一切都可以使用数据帧(通过使用数据帧函数)来完成。 Spark 数据帧(它是 pyspark.sql 中 spark sql 包的一部分)就像 RDD 一样分布(并且更加优化)。如果缺少某些东西,您通常可以使用 UDF 解决它(尽管 python UDF 比 scala 慢,并且肯定比 pyspark.sql.functions 中的可用函数慢)。尝试查看 spark sql 的编程指南以获取更多信息
  • @AssafMendelson 您对我所做的更新还有其他建议吗?我可以进行其他优化吗?
  • 看看内置函数。使用内置函数比 UDF 快得多。例如,我看到您正在定义 modify_val UDF。相反,您可以使用 when,否则。请参阅 pyspark.sql.functions。
猜你喜欢
  • 2015-03-11
  • 1970-01-01
  • 2014-01-04
  • 2020-11-20
  • 1970-01-01
  • 2010-10-09
  • 2013-04-15
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多