【问题标题】:Schema requiring less fields than declared模式需要的字段少于声明的字段
【发布时间】:2019-12-17 17:02:05
【问题描述】:

我正在运行一个将 TSV 转换为 Parquet 的 Spark 程序。在我写的时候我收到一个错误java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 5 fields are required while 7 values are provided.我没有做太多,只是在写入数据之前应用字段名称和应用类型。

rdd = sc.textFile('s3://in-location/').map(lambda x: x.split('\t'))
df = rdd.toDF()

if "_1" in df.columns:
    df = df.withColumnRenamed("_1", "ts")
else:
    df = df.withColumn("ts", sf.lit(None))

if "_2" in df.columns:
    df = df.withColumnRenamed("_2", "ts_offset")
else:
    df = df.withColumn("ts_offset", sf.lit(None))

if "_3" in df.columns:
    df = df.withColumnRenamed("_3", "id")
else:    
    df = df.withColumn("id", sf.lit(None))   

if "_4" in df.columns:
    df = df.withColumnRenamed("_4", "testing")
else:
   df = df.withColumn("testing", sf.lit(None))

if "_5" in df.columns:
    df = df.withColumnRenamed("_5", "value")
else:
    df = df.withColumn("value", sf.lit(None))

if "_6" in df.columns:
    df = df.withColumnRenamed("_6", "version")
else:
    df = df.withColumn("version", sf.lit(None))

if "_7" in df.columns:
    df = df.withColumnRenamed("_7", "size")
else:
    df = df.withColumn("size", sf.lit(None))


df = df.withColumn("ts", df["ts"].cast(types.TimestampType()))
df = df.withColumn("ts_offset", df["ts_offset"].cast(types.ShortType()))
df = df.withColumn("id", df["id"].cast(types.StringType()))
df = df.withColumn("testing", df["testing"].cast(types.BooleanType()))
df = df.withColumn("value", df["value"].cast(types.StringType()))
df = df.withColumn("version", df["version"].cast(types.StringType()))
df = df.withColumn("size", df["size"].cast(types.StringType()))

outDf = df.select("ts", "ts_offset", "id", "testing", "value", "version", "size")

if "ts" not in outDf.columns:
    outDf = outDf.withColumn("ts", sf.lit(None).cast(types.TimestampType()))

if "ts_offset" not in outDf.columns:
    outDf = outDf.withColumn("ts_offset", sf.lit(None).cast(types.ShortType()))

if "id" not in outDf.columns:
    outDf = outDf.withColumn("id", sf.lit(None).cast(types.StringType()))

if "testing" not in outDf.columns:
    outDf = outDf.withColumn("testing", sf.lit(None).cast(types.BooleanType()))

if "value" not in outDf.columns:
    outDf = outDf.withColumn("value", sf.lit(None).cast(types.StringType()))

if "version" not in outDf.columns:
    outDf = outDf.withColumn("version", sf.lit(None).cast(types.StringType()))

if "size" not in outDf.columns:
    outDf = outDf.withColumn("size", sf.lit(None).cast(types.StringType()))

outDf.printSchema()

outDf\
    .repartition(48)\
    .write.mode("append")\
    .partitionBy("id")\
    .parquet("s3://out-location/")

我一直在努力确保所有列都存在于最终的 outDf 中,即使所有列都没有出现在输入文件的所有行中。我们最近添加了 2 个新列,因此大约一半的数据缺少这些列。这就是为什么我要检查并尝试在列不存在时添加 lit(None) 的值。我猜只有某些行缺少数据,这就是它抱怨的原因,但我不确定如何解决我的问题。理想情况下,如果数据丢失,则 Parquet 中的数据将为空,我将如何实现?

提前致谢!

【问题讨论】:

    标签: dataframe pyspark apache-spark-sql parquet


    【解决方案1】:

    看起来您要附加到的输出位置只有 5 列,而您的数据框是 7 列。所以你不能附加到这个输出位置。尝试保存到其他位置

    【讨论】:

    • 我不相信是这样的。我尝试了不同的位置,结果相同,我认为这是一次运行问题,而不是由于现有数据。
    猜你喜欢
    • 1970-01-01
    • 2020-09-30
    • 2016-10-17
    • 2017-05-08
    • 2011-07-21
    • 2018-07-05
    • 2019-07-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多