用spark做大数据处理,不怕数据大,就怕发生数据倾斜,一发生数据倾斜,轻则spark job要跑很久才能结束,重则OOM,把一个executor的存储空间撑爆,导致程序终止。
一个spark job 是由多个stage组成的 ,stage之间具有先后关系,所以是串行执行的 ,一个stage是由多个task 组成的,每个task之间可以并行运行,一个stage的运行时间由耗时最长的那个task来决定的。
写了一个很简单的spark 程序,将多张表合并起来
import java.text.SimpleDateFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.functions._
object merge_table {
def main(args: Array[String]): Unit = {
val spark =SparkSession.builder().config(new SparkConf()).getOrCreate()
val formatDate=udf((date:String)=> {
val time = date.substring(0, 10).toLong
val newtime: String = new SimpleDateFormat("yyyyMM.0").format(time * 1000)
newtime
})
val basic_train =spark.read.parquet("/user/h_data_platform/platform/mifi/mifi_compete_train_labels/data").drop("effective_time")
val basic_test =spark.read.parquet("/user/h_data_platform/platform/mifi/mifi_compete_test_labels/data").drop("effective_time")
val mibasic_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/mibasic_compete_data")
val mispend_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/mispend_compete_data")
val keywords_compete_data= spark.read.parquet("/user/h_mifi/user/mifi_compete/keywords_compete_data")
val keywords_classapp_micloud_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/keywords_classapp_micloud_compete_data")
val interest_compete_data =spark.read.parquet("/user/h_mifi/user/mifi_compete/interest_compete_data")
val miothers_compete_data =spark.read.parquet("/user/h_mifi/user/mifi_compete/miothers_compete_data")
var user_interest_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/user_interest_data")
val user_info = spark.read.parquet("/user/h_mifi/user/mifi_compete/user_info")
val app_details_1_compete_data =spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_1_compete_data")
val app_details_2_1_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_2_1_compete_data")
val app_details_2_2_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_2_2_compete_data")
val app_details_3_1_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_3_1_compete_data")
val app_details_3_2_compete_data = spark.read.parquet("/user/h_mifi/user/mifi_compete/app_details_3_2_compete_data")
val interest_data_expr = user_interest_data.columns.filter(x=>x!="user_id" && x!="date").flatMap{x=>
Seq(first(x).alias(x))
}
user_interest_data=user_interest_data
.drop("date")
.groupBy("user_id")
.agg(interest_data_expr.head,interest_data_expr.tail:_*)
.distinct()
val table_one = mibasic_compete_data
.join(mispend_compete_data,Seq("user_id","month"),"left")
.join(keywords_compete_data,Seq("user_id","month"),"left")
.join(keywords_classapp_micloud_compete_data,Seq("user_id","month"),"left")
.join(interest_compete_data,Seq("user_id","month"),"left")
.join(miothers_compete_data,Seq("user_id","month"),"left")
.join(app_details_1_compete_data,Seq("user_id","month"),"left")
.join(app_details_2_1_compete_data,Seq("user_id","month"),"left")
.join(app_details_2_2_compete_data,Seq("user_id","month"),"left")
.join(app_details_3_1_compete_data,Seq("user_id","month"),"left")
.join(app_details_3_2_compete_data,Seq("user_id","month"),"left")
.join(user_info,Seq("user_id"),"left")
.distinct()
.write
.mode(SaveMode.Overwrite)
.parquet("/user/h_miui_ad/develop/wangdaopeng/contest/oct/table_one")
因为涉及表与表之间的操作,所以和有可能出现数据倾斜的情况发生,合并的几张表条数都在40w左右。提交任务后,过了一晚上第二天观察sparkUI情况,果然发生了数据倾斜,主要表现在以下几个方面
表现一:某个stage运行时间过长
如下所示,stage12的运行时间长达17.8h,而前12个stage都是在秒级别的时间消耗内完成的,并且
表现二:shuffle read的数据量和shuffle write的数据量相差巨大
stage12的shuffle write的数据量为7.2GB,shuffle write的数据量达到了3.7GB
表现三:task运行时间过长,读写数据量相差巨大
点进去可能存在问题的stage,查看task的运行情况
可以看到一些task的运行时间非常长,并且读写的数据量相差巨大,stage内的task是并行在各个节点上进行运算的 ,耗时最长的的task决定stage的最终完成时间
根据以上三个特征基本上可以判断出该spark任务发生了数据倾斜