【问题标题】:Does CrossValidator in PySpark distribute the execution?PySpark 中的 CrossValidator 是否分发执行?
【发布时间】:2023-03-17 17:19:01
【问题描述】:

我正在使用 PySpark 中的机器学习,并且正在使用 RandomForestClassifier。到目前为止,我一直在使用 Sklearn。我正在使用 CrossValidator 来调整参数并获得最佳模型。下面是来自 Spark 网站的示例代码。

从我一直在阅读的内容来看,我不明白 spark 是否也分配了参数调整,或者它与 Sklearn 的 GridSearchCV 的情况相同。

任何帮助将不胜感激。

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

【问题讨论】:

  • 任何提示或者如果问题不清楚,请告知

标签: apache-spark machine-learning parameters pyspark


【解决方案1】:

Spark 2.3+

SPARK-21911 包含平行模型拟合。并行度由parallelismParam控制。

火花

它没有。交叉验证实现为普通的nested for loop

for i in range(nFolds):
    ...
    for j in range(numModels):
        ...

只有训练单个模型的过程是分布式的。

【讨论】:

    【解决方案2】:

    我找到了答案。正如其他用户所回答的那样,该过程不是并行化的,而是串行操作。但是,有一个 spark_sklearn 模块可用于此网格搜索,它分发它,但它不分发模型构建。所以这就是权衡。

    这是使用 spark_sklearn GridSearchCV 的代码

    %pyspark
    
    """
    DATA - https://kdd.ics.uci.edu/databases/20newsgroups/mini_newsgroups.tar.gz
    METHOD 1 - USING GRIDSEARCH CV FROM SPARK_SKLEARN MODULE BY DATABRICKS
    DOCUMENTATION - https://databricks.com/blog/2016/02/08/auto-scaling-scikit-learn-with-apache-spark.html
    THIS IS DISTRIBUTED OPERATION AS MENTIONED ON THE WEBSITE
    """
    from spark_sklearn import GridSearchCV
    from pyspark.ml.feature import HashingTF,StopWordsRemover,IDF,Tokenizer
    from pyspark.ml import Pipeline
    from pyspark.sql.types import StructField, StringType, StructType
    from pyspark.ml.feature import IndexToString, StringIndexer
    from spark_sklearn.converter import Converter
    from sklearn.pipeline import Pipeline as S_Pipeline
    from sklearn.ensemble import RandomForestClassifier as S_RandomForestClassifier
    
    path = 's3://sparkzepellin/mini_newsgroups//*'
    news = sc.wholeTextFiles(path)
    print "Toal number of documents = ",news.count()
    
    # print 5 samples
    news.takeSample(False,5, 1)
    
    # Using sqlContext createa dataframe
    schema = ["id", "text", "topic"]
    fields = [StructField(field_name, StringType(), True) for field in schema]
    schema = StructType(fields)
    
    # Applying the schema decalred above as an RDD
    newsgroups = news.map(lambda (localPath, text): (localPath.split("/")[-1], text, localPath.split("/")[-2]))
    df = sqlContext.createDataFrame(newsgroups, schema)
    
    df_new = StringIndexer(inputCol="topic", outputCol="label").fit(df).transform(df)
    
    # Build a pipeline with tokenier, hashing TF, IDF, and finally a RandomForest
    tokenizer = Tokenizer().setInputCol("text").setOutputCol("words")
    hashingTF = HashingTF().setInputCol("words").setOutputCol("rawFeatures")
    idf = IDF().setInputCol("rawFeatures").setOutputCol("features")
    
    pipeline=Pipeline(stages=[tokenizer, hashingTF, idf])
    data = pipeline.fit(df_new).transform(df_new)
    
    # Using Converter, convert to pandas dataframe (numpy)
    # to run on distributed sklearn using spark_sklearn package
    converter = Converter(sc)
    new_df = Converter.toPandas(data.select(data.features.alias("text"), "label"))
    
    # Sklearn pipeline
    s_pipeline = S_Pipeline([
                ('rf', S_RandomForestClassifier())
            ])
    
    # Random parameters
    parameters = {
        'rf__n_estimators': (10, 20),
        'rf__max_depth': (2, 10)
    }
    
    # Run GridSearchCV using the above defined parameters on the pipeline created
    gridSearch = GridSearchCV(sc, s_pipeline, parameters)
    GS = gridSearch.fit(new_df.text.values, new_df.rating.values)
    

    另一种方法是使用 map 方法来并行化操作并取回准确度等指标。

    【讨论】:

    • 这个解决方案不好。它并行化 CV,但不并行化。您可以并行化以下两个过程之一:学习或交叉验证。
    猜你喜欢
    • 2020-08-27
    • 2018-12-16
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多