以下是获取相关字符串列统计信息的示例:
def getStringColumnProfile(df: DataFrame, columnName: String): DataFrame = {
df.select(columnName)
.withColumn("isEmpty", when(col(columnName) === "", true).otherwise(null))
.withColumn("isNull", when(col(columnName).isNull, true).otherwise(null))
.withColumn("fieldLen", length(col(columnName)))
.agg(
max(col("fieldLen")).as("max_length"),
countDistinct(columnName).as("unique"),
count("isEmpty").as("is_empty"),
count("isNull").as("is_null")
)
.withColumn("col_name", lit(columnName))
}
def profileStringColumns(df: DataFrame): DataFrame = {
df.columns.filter(df.schema(_).dataType == StringType)
.map(getStringColumnProfile(df, _))
.reduce(_ union _)
.toDF
.select("col_name"
, "unique"
, "is_empty"
, "is_null"
, "max_length")
}
数字列也是如此
def getNumericColumnProfile(df: DataFrame, columnName: String): DataFrame = {
df.select(columnName)
.withColumn("isZero", when(col(columnName) === 0, true).otherwise(null))
.withColumn("isNull", when(col(columnName).isNull, true).otherwise(null))
.agg(
max(col(columnName)).as("max"),
count("isZero").as("is_zero"),
count("isNull").as("is_null"),
min(col(columnName)).as("min"),
avg(col(columnName)).as("avg"),
stddev(col(columnName)).as("std_dev")
)
.withColumn("col_name", lit(columnName))
}
def profileNumericColumns(df: DataFrame): DataFrame = {
df.columns.filter(
Set("DecimalType", "IntegerType", "LongType", "DoubleType", "FloatType", "ShortType")
contains df.schema(_).dataType.toString
)
.map(getNumericColumnProfile(df, _))
.reduce(_ union _)
.toDF
.select("col_name",
"col_type",
"is_null",
"is_zero",
"min",
"max",
"avg",
"std_dev")
}