【问题标题】:Logistic regression with spark ml (data frames)使用 spark ml 进行逻辑回归(数据帧)
【发布时间】:2016-09-13 17:21:43
【问题描述】:

我为逻辑回归编写了以下代码,我想使用spark.ml提供的管道API。但是,在我尝试打印系数和截距后,它给了我一个错误。此外,我在计算混淆矩阵和其他指标(如精度、召回率)时遇到问题。

#Logistic Regression:
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.sql  import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


sc = SparkContext("local", "predictive")
sqlContext=SQLContext(sc)

df = sqlContext.read.load('/user/bna_ads_final.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

df.show(5)
df.count()
df.dtypes  
df=df.withColumn("load_date",df.load_date.cast("timestamp"))
df_withday= df.withColumn("day",dayofmonth(df.load_date))
df_new=df_withday.withColumn("Month",month(df.load_date))
df_new=df_new.withColumn("classname",df_new.classname.cast("string"))
ignore = ["load_date","wo_flag","serialnumber", "classname"]

def modify_values(r):
if r == "A" or r =="B":
    return "dispatch"
else:
    return "non-dispatch"

def show_metrics(metrics):
# Overall statistics
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)
print (metrics.confusionMatrix())   

ol_val = udf(modify_values, StringType())
df_final = df_new.withColumn("wo_flag",ol_val(df_new.wo_flag))
indexer= StringIndexer(inputCol="classname", outputCol="classnamecat")
indexed = indexer.fit(df_final).transform(df_final)
indexed=indexed.withColumn("classnamecat",indexed.classnamecat.cast("int"))
indexed.show(5)
(trainingData, testData) = indexed.randomSplit([0.7, 0.3])
assembler = VectorAssembler(inputCols=[x for x in indexed.columns if x not in ignore],outputCol='features')
stringindexer=StringIndexer(inputCol="wo_flag", outputCol="labellr")
Classifier= LogisticRegression(labelCol="labellr", featuresCol="features")
pipeline=Pipeline(stages=[stringindexer,assembler,Classifier])
model = pipeline.fit(trainingData)
predictions = model.transform(testData)

selected = predictions.select("features", "labellr", "probability", "prediction")
for row in selected.collect():
print row


evaluator = MulticlassClassificationEvaluator(
labelCol="labellr", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
print("Accuracy= %g" % (accuracy))

print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

我得到的错误是:

print("Coefficients: " + str(model.coefficients))
AttributeError: 'PipelineModel' object has no attribute 'coefficients'

我在 Hadoop 集群上安装了 Spark 1.5,我将无法很快升级。有没有办法解决这个问题。

load_date           |  r         |   classname| mstatus34_timdiff|  day|Month| classnamecat| serialnumber
+-----------+------------------+----------+--------------------+------------+--- +-----------+----
2013-12-29 10:55:...|non-dispatch|        6634|               19|    1|    7|         0.0| 231234     
2014-10-05 23:43:...|non-dispatch|        6634|                4|    5|   10|         0.0| 342345
2014-10-09 09:39:...|    dispatch|        5886|               36|    9|   10|         1.0| 563472
2014-09-16 09:47:...|    dispatch|        6634|               53|   16|    9|         0.0| 134657

【问题讨论】:

    标签: python pyspark pipeline logistic-regression apache-spark-ml


    【解决方案1】:

    您可以使用PipelineModelstages 属性访问各个阶段

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
    from pyspark.ml.feature import VectorAssembler
    
    df = sc.parallelize([
        (0.0, 1.0, 2.0, 4.0),
        (1.0, 3.0, 4.0, 5.0)
    ]).toDF(["label", "x1", "x2", "x3"])
    
    assembler = (VectorAssembler()
        .setInputCols(df.columns[1:])
        .setOutputCol("features"))
    
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    
    pipeline = Pipeline(stages=[assembler, lr])
    model = pipeline.fit(data)
    
    [stage.coefficients for stage in model.stages if hasattr(stage, "coefficients")]
    ## [DenseVector([2.1178, 1.6843, -1.8338])]
    
    ## or
    
    [stage.coefficients for stage in model.stages
        if isinstance(stage, LogisticRegressionModel)]
    ## [DenseVector([2.1178, 1.6843, -1.8338])]
    

    【讨论】:

    • 谢谢你的回答,但是这不会打印系数,它只是给我一个空白括号:[]
    • 只要管道中有正确的模型,应该可以正常工作。请参阅minimal reproducible example
    • 你好@zero323 我附上了转换后的数据的样子,模型在使用 RDD 和使用带有标记点的 lambda 函数来制作特征和标签时工作正常。但它在数据帧上失败了,我的计算指标和打印系数的功能都不起作用
    【解决方案2】:

    试试这个

    pipeline=Pipeline(stages=[assembler, lr])
    model = pipeline.fit(trainingData)
    lrm = model.stages[-1]
    
    lrm.coefficients
    

    【讨论】:

      猜你喜欢
      • 2016-09-26
      • 1970-01-01
      • 1970-01-01
      • 2016-03-24
      • 1970-01-01
      • 2018-05-04
      • 1970-01-01
      • 2015-12-19
      • 2015-02-21
      相关资源
      最近更新 更多