kai-xuan

1,SparkContext和sparkSession的区别

1-1、#构建SparkContext,读取文件

    from pyspark import *
    import os
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"

    conf = SparkConf().setAppName("appname").setSparkHome("/opt/cloudera/parcels/SPARK2/lib/spark2").setMaster("yarn")
    sc = SparkContext(conf=conf)
    rdd = sc.textFile("hdfs://part*",10)
1-2、#构建SparkSession  执行hive语句(这个也能完成1中的功能,所以能用这个尽量用这个)

    from pyspark.sql import *
    spark = SparkSession.builder.appName("appName").master("yarn").enableHiveSupport().getOrCreate()
    get_duration_sql=\'\'\'
    select
    id,duration
    from
    *_db.da_*
    where
    day=\'20210126\' and duration is not null
\'\'\'
rdd=spark.sql(get_duration_sql).rdd

SparkSession是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
 在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过sparkcontext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context。例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用hiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
 SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的
资料链接:https://blog.csdn.net/beautiful_huang/article/details/103820534

2,sparkSession的使用

    if len(sys.argv) < 4:
        print(\'input error\')
    directory = sys.argv[1]
    day = sys.argv[2]
    hour = sys.argv[3]
    if hour=="24":
        hour="*"
    # 配置环境
    os.environ["YARN_CONF_DIR"] = "/etc/spark2/conf/yarn-conf"
    # 实例化spark
    spark = SparkSession.builder. \
        appName("spark_slot_eva"). \
        config("spark.sql.shuffle.partitions", 10). \
        config("spark.default.parallelism", 1600). \
        config("hive.warehouse.subdir.inherit.perms", "false"). \
        config("spark.executor.cores", "4"). \  #spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task
        config("spark.executor.instances", "200"). \ #能够启动的executor的个数
        config("spark.executor.memory", "4g"). \  #executor memory是每个节点上占用的内存。每一个节点都可使用内存
        config("spark.port.maxRetries","100").\   #最大重试次数,不设置的话默认4,容易启动spark失败
        config("spark.yarn.executor.memoryOverhead","26g").\
        enableHiveSupport(). \
        getOrCreate()
    sc = spark.sparkContext
    # 读取样本数据集和slot.conf
    dir_list=[directory,day,hour,"part*"]
    final_dir="/".join(dir_list)
    print(final_dir)
    rdd = sc.textFile(final_dir)
    # print(\'------------------rdd----------------\', rdd.first())
    # 2.计算数据的总行数
    line_num = rdd.count()
    print(\'-------------------------------------line_num--------------\', line_num)
     # 计算特征覆盖率
    dict1 = get_slot_cover(rdd,line_num)
    dict2 = get_slot_num(rdd)
    dict3 = get_slot_num_avg(rdd,line_num)
    res2hive(spark,day,hour,dict1,dict2,dict3)
spark.executor.memory  + spark.yarn.executor.memoryOverhead  < 30G  在运行的有这个限制 30G为集群内存限制(公司环境限制)

 

   

spark.default.parallelism    并行度问题,如果不设置这个参数,Spark 会跟据 HDFS 中 Block 的个数去设置这一个数量,
原理是默应每个 Block 会对应一个 Task,默应情况下,如果数据量不是太多就不可以充份利用 executor 设置的资源,
就会浪费了资源。建义设置为 100个,最好 700个左右。Spark官方的建义是每一个 Core 负责 2-3 个 Task
4*200 *2(ccores*instances)

 

分类:

技术点:

相关文章: