【问题标题】:How to do One Hot Encoding for Linear Regression in Spark with Python?如何使用 Python 对 Spark 中的线性回归进行一次热编码?
【发布时间】:2015-12-02 15:44:43
【问题描述】:

我有我为Random Forest regression 编码编写的这段代码。但是Random Forest regressionindexer 之后不需要One Hot Encoding。现在我想试试Linear Regression,它需要One Hot Encoding。我浏览了 Sparks One Hot Encoder 文档,但不知道如何将其合并到我当前的代码中。如何在当前代码中添加One Hot Encoding 步骤?

from pyspark.ml.feature import StringIndexer
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import VectorAssembler
import org.apache.spark.ml.feature.OneHotEncoder

label_col = "x4"

# converting RDD to dataframe
train_data_df = train_data.toDF(("x0","x1","x2","x3","x4"))

# Indexers encode strings with doubles
string_indexers = [
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
   for x in train_data_df.columns if x != label_col
]

# Assembles multiple columns into a single vector
assembler = VectorAssembler(
    inputCols=["idx_{0}".format(x) for x in train_data_df.columns if x != label_col],
    outputCol="features"
)


pipeline = Pipeline(stages=string_indexers + [assembler])
model = pipeline.fit(train_data_df)
indexed = model.transform(train_data_df)

label_points = (indexed
.select(col(label_col).cast("double").alias("label"), col("features"))
.map(lambda row: LabeledPoint(row.label, row.features)))

更新:

from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel


###### FOR TEST DATA ######
label_col_test = "x4"

# converting RDD to dataframe
test_data_df = test_data.toDF(("x0","x1","x2","x3","x4"))

# Indexers encode strings with doubles
string_indexers_test = [
   StringIndexer(inputCol=x, outputCol="idx_{0}".format(x))
   for x in testData_df_1.columns if x != label_col_test
]

# encoders
encoders_test = [
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
   for x in testData_df_1.columns if x != label_col_test
]

# Assembles multiple columns into a single vector
assembler_test = VectorAssembler(
    inputCols=["idx_{0}".format(x) for x in testData_df_1.columns if x != label_col_test],
    outputCol="features"
)


pipeline_test = Pipeline(stages=string_indexers_test + encoders_test + [assembler_test])
model_test = pipeline_test.fit(test_data_df)
indexed_test = model_test.transform(test_data_df)

label_points_test = (indexed_test
    .select(col(label_col_test).cast("float").alias("label"), col("features"))
    .map(lambda row: LabeledPoint(row.label, row.features)))

# Build the model
model = LinearRegressionWithSGD.train(label_points)

valuesAndPreds = label_points_test.map(lambda p: (p.label, model.predict(p.features)))

MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))

【问题讨论】:

  • 你能告诉我 PySpark 中哪些机器学习模型需要一个热编码,哪些不需要?我想不通

标签: python encoding apache-spark machine-learning pyspark


【解决方案1】:

您可以简单地将其添加为索引和组装之间的一个步骤:

encoders = [
   StringIndexer(inputCol="idx_{0}".format(x), outputCol="enc_{0}".format(x))
   for x in train_data_df.columns if x != label_col
]

assembler = VectorAssembler(
    inputCols=[
        "enc_{0}".format(x) for x in train_data_df.columns if x != label_col
    ],
    outputCol="features"
)


pipeline = Pipeline(stages=string_indexers + encoders + [assembler])

【讨论】:

  • 当我在 Spark 中运行线性回归时,我得到 MSEinf。我的代码中有问题。我已经在上面的更新部分发布了我的其余代码。
  • 这个问题在 SO 上已经被多次讨论过了。如果你不调整你的模型是你可以期待的。
  • @JasonDonnald 调整模型并非易事。您是否尝试过对超参数进行网格搜索?
  • 一种简单的方法是遍历条目列表以获得迭代次数,应用模型并评估以保留最佳模型的迭代次数。你也可以在里面申请简历。您可以将其应用于模型的所有参数(超参数)。这称为网格搜索。
  • 我建议您阅读 this 关于 scikit.learn 的网格搜索,这在理论上是相似的。
猜你喜欢
  • 2017-05-10
  • 2022-09-27
  • 2020-11-14
  • 2018-04-28
  • 2021-11-16
  • 2020-05-30
  • 2018-07-23
  • 2022-01-24
相关资源
最近更新 更多