【发布时间】:2019-01-21 06:04:42
【问题描述】:
Column Names
Production_uint_id,batch_id,items_produced,items_discarded
Data:
P188 gv962 {'scissor': 141, 'paper': 274, 'rock': 218}
{'scissor': 14,'paper': 135, 'rock': 24}
P258 mr005 {'scissor': 151, 'paper': 143, 'rock': 225}
{'scissor': 24, 'paper': 60, 'rock': 17}
代码:
from pyspark.sql.types import *
sc = spark.sparkContext
production_rdd = sc.textFile("/Production_logs.tsv")
production_parts = production_rdd.map(lambda l: l.split("\t"))
production = production_parts.map(lambda p: (p[0], p[1], p[2], p[3].strip()))
schemaStringProduction = "production_unit_id batch_id items_produced items_discarded"
fieldsProduction = [StructField(field_name, StringType(), True) for field_name in schemaStringProduction.split()]
schemaProduction = StructType(fieldsProduction)
schemaProductionDF = spark.createDataFrame(production, schemaProduction)
I am Trying to explode
exploding = schemaProductionDF.select("production_unit_id", explode("items_produced").alias("item_p", "item_p_count"), "items_discarded")
收到此错误:
pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(`items_produced`)' due to data type mismatch:
input to function explode should be array or map type, not string;
请帮忙
【问题讨论】:
-
所有列在您的架构
fieldsProduction = [StructField(field_name, StringType(), True) for field_name in schemaStringProduction.split()]中设置为StringType。将其更改为正确的数据类型。 -
嗨,Suresh,我应该提到哪种数据类型?
-
您可以将
MapType(StringType(),LongType())用于列items_produced,items_discarded
标签: apache-spark pyspark apache-spark-sql pyspark-sql