【发布时间】:2019-12-17 17:02:05
【问题描述】:
我正在运行一个将 TSV 转换为 Parquet 的 Spark 程序。在我写的时候我收到一个错误java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 5 fields are required while 7 values are provided.我没有做太多,只是在写入数据之前应用字段名称和应用类型。
rdd = sc.textFile('s3://in-location/').map(lambda x: x.split('\t'))
df = rdd.toDF()
if "_1" in df.columns:
df = df.withColumnRenamed("_1", "ts")
else:
df = df.withColumn("ts", sf.lit(None))
if "_2" in df.columns:
df = df.withColumnRenamed("_2", "ts_offset")
else:
df = df.withColumn("ts_offset", sf.lit(None))
if "_3" in df.columns:
df = df.withColumnRenamed("_3", "id")
else:
df = df.withColumn("id", sf.lit(None))
if "_4" in df.columns:
df = df.withColumnRenamed("_4", "testing")
else:
df = df.withColumn("testing", sf.lit(None))
if "_5" in df.columns:
df = df.withColumnRenamed("_5", "value")
else:
df = df.withColumn("value", sf.lit(None))
if "_6" in df.columns:
df = df.withColumnRenamed("_6", "version")
else:
df = df.withColumn("version", sf.lit(None))
if "_7" in df.columns:
df = df.withColumnRenamed("_7", "size")
else:
df = df.withColumn("size", sf.lit(None))
df = df.withColumn("ts", df["ts"].cast(types.TimestampType()))
df = df.withColumn("ts_offset", df["ts_offset"].cast(types.ShortType()))
df = df.withColumn("id", df["id"].cast(types.StringType()))
df = df.withColumn("testing", df["testing"].cast(types.BooleanType()))
df = df.withColumn("value", df["value"].cast(types.StringType()))
df = df.withColumn("version", df["version"].cast(types.StringType()))
df = df.withColumn("size", df["size"].cast(types.StringType()))
outDf = df.select("ts", "ts_offset", "id", "testing", "value", "version", "size")
if "ts" not in outDf.columns:
outDf = outDf.withColumn("ts", sf.lit(None).cast(types.TimestampType()))
if "ts_offset" not in outDf.columns:
outDf = outDf.withColumn("ts_offset", sf.lit(None).cast(types.ShortType()))
if "id" not in outDf.columns:
outDf = outDf.withColumn("id", sf.lit(None).cast(types.StringType()))
if "testing" not in outDf.columns:
outDf = outDf.withColumn("testing", sf.lit(None).cast(types.BooleanType()))
if "value" not in outDf.columns:
outDf = outDf.withColumn("value", sf.lit(None).cast(types.StringType()))
if "version" not in outDf.columns:
outDf = outDf.withColumn("version", sf.lit(None).cast(types.StringType()))
if "size" not in outDf.columns:
outDf = outDf.withColumn("size", sf.lit(None).cast(types.StringType()))
outDf.printSchema()
outDf\
.repartition(48)\
.write.mode("append")\
.partitionBy("id")\
.parquet("s3://out-location/")
我一直在努力确保所有列都存在于最终的 outDf 中,即使所有列都没有出现在输入文件的所有行中。我们最近添加了 2 个新列,因此大约一半的数据缺少这些列。这就是为什么我要检查并尝试在列不存在时添加 lit(None) 的值。我猜只有某些行缺少数据,这就是它抱怨的原因,但我不确定如何解决我的问题。理想情况下,如果数据丢失,则 Parquet 中的数据将为空,我将如何实现?
提前致谢!
【问题讨论】:
标签: dataframe pyspark apache-spark-sql parquet