用spark做大数据处理,不怕数据大,就怕发生数据倾斜,一发生数据倾斜,轻则spark job要跑很久才能结束,重则OOM,把一个executor的存储空间撑爆,导致程序终止。
一个spark job 是由多个stage组成的 ,stage之间具有先后关系,所以是串行执行的 ,一个stage是由多个task 组成的,每个task之间可以并行运行,一个stage的运行时间由耗时最长的那个task来决定的。
写了一个很简单的spark 程序,将多张表合并起来

import java.text.SimpleDateFormat

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._

object merge_table {
  def main(args: Array[String]): Unit = {
     val spark =SparkSession.builder().config(new SparkConf()).getOrCreate()
    val formatDate=udf((date:String)=> {
      val time = date.substring(0, 10).toLong
      val newtime: String = new SimpleDateFormat("yyyyMM.0").format(time * 1000)
      newtime
    })

    val basic_train =spark.read.parquet("/user/h_data_platform/platform/mifi/mifi_compete_train_labels/data").drop("effective_time")
    val basic_test  =spark.read.parquet("/user/h_data_platform/platform/mifi/mifi_compete_test_labels/data").drop("effective_time")


    val mibasic_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/mibasic_compete_data")
    val mispend_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/mispend_compete_data")
    val keywords_compete_data= spark.read.parquet("/user/h_mifi/user/mifi_compete/keywords_compete_data")
    val keywords_classapp_micloud_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/keywords_classapp_micloud_compete_data")
    val interest_compete_data =spark.read.parquet("/user/h_mifi/user/mifi_compete/interest_compete_data")

    val miothers_compete_data  =spark.read.parquet("/user/h_mifi/user/mifi_compete/miothers_compete_data")
    var user_interest_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/user_interest_data")
    val user_info = spark.read.parquet("/user/h_mifi/user/mifi_compete/user_info")

    val app_details_1_compete_data =spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_1_compete_data")
    val app_details_2_1_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_2_1_compete_data")
    val app_details_2_2_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_2_2_compete_data")
    val app_details_3_1_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_3_1_compete_data")
    val app_details_3_2_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_3_2_compete_data")

    val interest_data_expr = user_interest_data.columns.filter(x=>x!="user_id" && x!="date").flatMap{x=>
       Seq(first(x).alias(x))
    }
    user_interest_data=user_interest_data
      .drop("date")
      .groupBy("user_id")
      .agg(interest_data_expr.head,interest_data_expr.tail:_*)
      .distinct()


    val table_one = mibasic_compete_data
      .join(mispend_compete_data,Seq("user_id","month"),"left")
      .join(keywords_compete_data,Seq("user_id","month"),"left")
      .join(keywords_classapp_micloud_compete_data,Seq("user_id","month"),"left")
      .join(interest_compete_data,Seq("user_id","month"),"left")
      .join(miothers_compete_data,Seq("user_id","month"),"left")
      .join(app_details_1_compete_data,Seq("user_id","month"),"left")
      .join(app_details_2_1_compete_data,Seq("user_id","month"),"left")
      .join(app_details_2_2_compete_data,Seq("user_id","month"),"left")
      .join(app_details_3_1_compete_data,Seq("user_id","month"),"left")
      .join(app_details_3_2_compete_data,Seq("user_id","month"),"left")
      .join(user_info,Seq("user_id"),"left")
      .distinct()
      .write
      .mode(SaveMode.Overwrite)
      .parquet("/user/h_miui_ad/develop/wangdaopeng/contest/oct/table_one")

因为涉及表与表之间的操作,所以和有可能出现数据倾斜的情况发生,合并的几张表条数都在40w左右。提交任务后,过了一晚上第二天观察sparkUI情况,果然发生了数据倾斜,主要表现在以下几个方面

表现一:某个stage运行时间过长

如下所示,stage12的运行时间长达17.8h,而前12个stage都是在秒级别的时间消耗内完成的,并且
Spark数据倾斜之发现篇

表现二:shuffle read的数据量和shuffle write的数据量相差巨大

stage12的shuffle write的数据量为7.2GB,shuffle write的数据量达到了3.7GB

表现三:task运行时间过长,读写数据量相差巨大

点进去可能存在问题的stage,查看task的运行情况
Spark数据倾斜之发现篇
可以看到一些task的运行时间非常长,并且读写的数据量相差巨大,stage内的task是并行在各个节点上进行运算的 ,耗时最长的的task决定stage的最终完成时间
根据以上三个特征基本上可以判断出该spark任务发生了数据倾斜

相关文章: