【问题标题】:PySpark Pipeline Error when using Indexer and Encoder使用索引器和编码器时出现 PySpark 管道错误
【发布时间】:2019-10-28 08:08:41
【问题描述】:

我正在使用bank data from UCI 来模板化一个项目。我在他们的文档站点上关注 PySpark 教程(抱歉找不到链接了)。通过管道运行时,我不断收到错误消息。我已经加载了数据,转换了特征类型,并完成了分类和数字特征的流水线操作。我希望对代码的任何部分提供任何反馈,但特别是在我收到错误的地方,以便我可以继续进行此构建。提前谢谢!

样本数据

+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
| id|age|       job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  1| 59|    admin.|married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|       0| unknown|    yes|
|  2| 56|    admin.|married|secondary|     no|     45|     no|  no|unknown|  5|  may|    1467|       1|   -1|       0| unknown|    yes|
|  3| 41|technician|married|secondary|     no|   1270|    yes|  no|unknown|  5|  may|    1389|       1|   -1|       0| unknown|    yes|
|  4| 55|  services|married|secondary|     no|   2476|    yes|  no|unknown|  5|  may|     579|       1|   -1|       0| unknown|    yes|
|  5| 54|    admin.|married| tertiary|     no|    184|     no|  no|unknown|  5|  may|     673|       2|   -1|       0| unknown|    yes|
+---+---+----------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
only showing top 5 rows
# Convert Feature Types
df.createOrReplaceTempView("df")

df2 = spark.sql("select \
                    cast(id as int) as id, \
                    cast(age as int) as age, \
                    cast(job as string) as job, \
                    cast(marital as string) as marital, \
                    cast(education as string) as education, \
                    cast(default as string) as default, \
                    cast(balance as int) as balance, \
                    cast(housing as string) as housing, \
                    cast(loan as string) as loan, \
                    cast(contact as string) as contact, \
                    cast(day as int) as day, \
                    cast(month as string) as month, \
                    cast(duration as int) as duration, \
                    cast(campaign as int) as campaign, \
                    cast(pdays as int) as pdays, \
                    cast(previous as int) as previous, \
                    cast(poutcome as string) as poutcome, \
                    cast(deposit as string) as deposit \
                from df")

# Data Types
df2.dtypes

[('id', 'int'),
 ('age', 'int'),
 ('job', 'string'),
 ('marital', 'string'),
 ('education', 'string'),
 ('default', 'string'),
 ('balance', 'int'),
 ('housing', 'string'),
 ('loan', 'string'),
 ('contact', 'string'),
 ('day', 'int'),
 ('month', 'string'),
 ('duration', 'int'),
 ('campaign', 'int'),
 ('pdays', 'int'),
 ('previous', 'int'),
 ('poutcome', 'string'),
 ('deposit', 'string')]


 # Build Pipeline (Error is Here)
categorical_cols = ["job","marital","education","default","housing","loan","contact","month","poutcome"]
numeric_cols = ["age", "balance", "day", "duration", "campaign", "pdays","previous"]

stages = []

stringIndexer = StringIndexer(inputCol=[cols for cols in categorical_cols],
                              outputCol=[cols + "_index" for cols in categorical_cols])

encoder = OneHotEncoderEstimator(inputCols=[cols + "_index" for cols in categorical_cols],
                                 outputCols=[cols + "_classVec" for cols in categorical_cols])

stages += [stringIndexer, encoder]

label_string_id = StringIndexer(inputCol="deposit", outputCol="label")
stages += [label_string_id]

assembler_inputs = [cols + "_classVec" for cols in categorical_cols] + numeric_cols
assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages += [assembler]

# Run Data Through Pipeline
pipeline = Pipeline().setStages(stages)
pipeline_model = pipeline.fit(df2)
prepped_df = pipeline_model.transform(df2)

错误

“TypeError:为参数“inputCols”提供的参数值无效。无法将 job_index 转换为字符串列表”

【问题讨论】:

    标签: python apache-spark pyspark pipeline apache-spark-ml


    【解决方案1】:

    这是因为 OneHotEncoderEstimator(与旧版 OneHotEncoder 不同)采用多列并产生多列(请注意,两个参数都是复数 - Cols 不是 Col)。因此,您应该使用list 包装每个呼叫,

    for cols in categorical_cols:
        ...
        encoder = OneHotEncoderEstimator(
          inputCols=[cols + "_index"], outputCols=[cols + "_classVec"]
        )
        ...
    

    或者最好在for循环之外同时传递所有列:

    encoder = OneHotEncoderEstimator(
        inputCols=[col + "_index" for cols in categorical_cols], 
        outputCols=[col + "_classVec" for for col in categorical_cols]
    )
    stages += [encoder]
    

    如果您不确定预期的输入/输出是什么,您可以随时查看相应的Param

    from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer
    
    OneHotEncoderEstimator.inputCols.typeConverter
    ## <function pyspark.ml.param.TypeConverters.toListString(value)>
    
    StringIndexer.inputCol.typeConverter
    ## <function pyspark.ml.param.TypeConverters.toString(value)>
    

    如您所见,前者需要可强制转换为字符串列表的对象,而后者只是一个字符串。

    【讨论】:

    • 感谢您的快速回复。这对编码器来说是有意义的,也是一种更好的编写方式。我仍然收到有关 stringIndexer 的错误。 “TypeError:为参数“inputCol”提供的参数值无效。无法将 转换为字符串类型”。我更新了上面的代码。
    • 那是因为StringIndexer 是一个不同的野兽。它采用单列(注意inputCol 而不是inputCols)并返回单列(注意outputCol 而不是outputCols)。换句话说,请注意复数形式,如果您有疑问,请检查 Scala API 或特定的 Param(s)
    • 带有一列字符串。你必须同时运行StringIndexer()OneHotEncoderEstimator() 吗?或者你可以只运行后者吗?我正在运行 spark 2.3。
    • 你必须先索引@Chuck - stackoverflow.com/q/35804755/10938362
    猜你喜欢
    • 1970-01-01
    • 2015-01-31
    • 2020-02-02
    • 1970-01-01
    • 2018-12-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多