【问题标题】:Grouped linear regression in SparkSpark中的分组线性回归
【发布时间】:2016-03-04 02:55:03
【问题描述】:

我在 PySpark 工作,我想找到一种方法来对数据组执行线性回归。具体给出这个数据框

import pandas as pd
pdf = pd.DataFrame({'group_id':[1,1,1,2,2,2,3,3,3,3],
                    'x':[0,1,2,0,1,5,2,3,4,5],
                    'y':[2,1,0,0,0.5,2.5,3,4,5,6]})
df = sqlContext.createDataFrame(pdf)

df.show()
# +--------+-+---+
# |group_id|x|  y|
# +--------+-+---+
# |       1|0|2.0|
# |       1|1|1.0|
# |       1|2|0.0|
# |       2|0|0.0|
# |       2|1|0.5|
# |       2|5|2.5|
# |       3|2|3.0|
# |       3|3|4.0|
# |       3|4|5.0|
# |       3|5|6.0|
# +--------+-+---+

我现在希望能够为每个 group_id 拟合一个单独的 y ~ ax + b 模型,并输出一个新的数据框,其中包含 ab 列以及每个组的一行。

例如对于1 组我可以这样做:

from sklearn import linear_model
# Regression on group_id = 1
data = df.where(df.group_id == 1).toPandas()
regr = linear_model.LinearRegression()
regr.fit(data.x.values.reshape(len(data),1), data.y.reshape(len(data),1))
a = regr.coef_[0][0]
b = regr.intercept_[0]
print('For group 1, y = {0}*x + {1}'.format(a, b))
# Repeat for group_id=2, group_id=3

但是为每个组执行此操作需要将数据返回给驱动程序,这并没有利用任何 Spark 并行性。

【问题讨论】:

  • 你有没有尝试过?如果是这样,你能告诉我们你的努力吗?目前还不清楚这里的输入是什么,如何将其转换为 Spark 数据结构,以及要使用哪种工具来创建模型。
  • 什么是group_id?,哪些是特征,哪些是标签?你想用ML还是MLLib
  • 我已编辑以阐明我想要做什么 - 在这种情况下,唯一的功能是 x。很高兴使用 MLMLLib 或其他任何可能合适的东西。
  • Pandas 也能做到这一点吗?

标签: python pandas apache-spark pyspark


【解决方案1】:

这是我找到的解决方案。不要对每组数据执行单独的回归,而是为每组创建一个带有单独列的稀疏矩阵:

from pyspark.mllib.regression import LabeledPoint, SparseVector

# Label points for regression
def groupid_to_feature(group_id, x, num_groups):
    intercept_id = num_groups + group_id-1
    # Need a vector containing x and a '1' for the intercept term
    return SparseVector(num_groups*2, {group_id-1: x, intercept_id: 1.0})

labelled = df.map(lambda line:LabeledPoint(line[2],
                groupid_to_feature(line[0], line[1], 3)))

labelled.take(5)
# [LabeledPoint(2.0, (6,[0,3],[0.0,1.0])),
#  LabeledPoint(1.0, (6,[0,3],[1.0,1.0])),
#  LabeledPoint(0.0, (6,[0,3],[2.0,1.0])),
#  LabeledPoint(0.0, (6,[1,4],[0.0,1.0])),
#  LabeledPoint(0.5, (6,[1,4],[1.0,1.0]))]

然后使用 Spark 的LinearRegressionWithSGD 运行回归:

from pyspark.mllib.regression import LinearRegressionModel, LinearRegressionWithSGD
lrm = LinearRegressionWithSGD.train(labelled, iterations=5000, intercept=False)

此回归的权重包含每个group_id 的系数和截距,即

lrm.weights
# DenseVector([-1.0, 0.5, 1.0014, 2.0, 0.0, 0.9946])

或重新整形为 DataFrame,为每个组提供 ab

pd.DataFrame(lrm.weights.reshape(2,3).transpose(), columns=['a','b'], index=[1,2,3])    
#           a              b
# 1 -0.999990   1.999986e+00
# 2  0.500000   5.270592e-11
# 3  1.001398   9.946426e-01

【讨论】:

  • 非常优雅的解决方案。我想知道当存在大量“组”时,性能与在mapPartitions 调用中应用sklearn 回归有何不同。
  • 非常好的解决方案,但它会扩展到数十万组吗?
  • 如何将 VectorAssembler 与此解决方案一起使用?
猜你喜欢
  • 2015-01-08
  • 2010-11-13
  • 1970-01-01
  • 2016-08-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多