【问题标题】:Passing variables between stages in custom ML pipeline在自定义 ML 管道中的阶段之间传递变量
【发布时间】:2018-08-06 22:37:42
【问题描述】:

我想从数据框中删除一些列,然后应用 ML 算法。我通过构建 2 个单独的管道来做到这一点。我的问题是如何将两条管道合并到一个管道中?

#######################
from typing import Iterable
import pandas as pd
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.sql import DataFrame
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
#######################

#Custom Class
#######################
class ColumnDropper_test(Transformer):
    def __init__(self, banned_list: Iterable[str]):
        super().__init__()
        self.banned_list = banned_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.drop(
            *[x for x in df.columns if any(y in x for y in self.banned_list)])

        return df
#######################

#Sample Data
#######################
data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'banned_me': [14, 15, 16, 17],
    'target': [21, 31, 41, 51]
})

df = spark.createDataFrame(data)
#######################

# First Pipeline
#######################
column_dropper = ColumnDropper_test(banned_list=['banned_me'])

model = Pipeline(stages=[column_dropper]).fit(df).transform(df)
#######################

#Second Pipeline(Question: Add the block of code below to the above pipeline)
#########################

ready = [col for col in model.columns if col != 'target']
assembler = VectorAssembler(inputCols=ready, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='target')

model_2 = Pipeline(stages=[assembler,dtc])

train_data, test_data = model.randomSplit([0.5,0.5])
fit_model = model_2.fit(train_data)
results = fit_model.transform(test_data)   
results.select('features','Prediction').show()

我发现的挑战在于上述代码中的变量ready。由于调用column_droppermodel.columns 会有所不同(列数更少),因此使用(df.columns)将其添加到同一管道将导致以下错误,因为banned_me 已被原始数据删除.

#Combining both Pipelines failed attempt
model = Pipeline(stages=[column_dropper,assembler,dtc]).fit(df).transform(df)

调用 o188.transform 时出错。 : java.lang.IllegalArgumentException:字段“banned_me”不存在。 可用字段:ball_column、keep_column、hall_column、target

我最初的建议是创建一个从ColumnDropper_testclass 继承df.columns 的新变量的新类。我们如何才能让Pipeline 中的assembler 阶段从column_dropper 阶段看到新的df,而不是查看原来的df

【问题讨论】:

  • 非常有趣的问题。我想出的一个丑陋的解决方案是嵌套Transformers,即将汇编器放在ColumnDropper 类中。但必须有更好的解决方案。我确实偶然发现了this,但是,自定义Transformer 没有getOutputCols 方法。因此,对于一个好的解决方案,我们必须找到一种方法来实现它(我认为)。我还没有找到如何做到这一点,很想看看其他人的解决方法。

标签: python apache-spark pyspark


【解决方案1】:

您必须创建一个继承VectorAssembler 的自定义类来自动设置inputCols

from pyspark import keyword_only

class CustomVecssembler(VectorAssembler):
    @keyword_only
    def __init__(self, outputCol='features'):
        super(CustomVecssembler, self).__init__()
        self.transformer = VectorAssembler(outputCol=outputCol)
        if spark.version.startswith('2.1'):
            kwargs = self.__init__._input_kwargs
        else:
            kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, outputCol='features'):
        if spark.version.startswith('2.1'):
            kwargs = self.__init__._input_kwargs
        else:
            kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, df):
        ready = [col for col in df.columns if col != 'target']
        self.setInputCols(ready)
        self.transformer.setInputCols(ready)
        df = self.transformer.transform(df)
        return df

验证它是否有效:

# prep dataset
data = pd.DataFrame({
    'ball_column': [0, 1, 2, 3],
    'keep_column': [7, 8, 9, 10],
    'hall_column': [14, 15, 16, 17],
    'banned_me': [14, 15, 16, 17],
    'target': [21, 31, 41, 51]
})
df = spark.createDataFrame(data)

# ORIGINAL IMPLEMENTATION
column_dropper = ColumnDropper_test(banned_list=['banned_me'])
model = Pipeline(stages=[column_dropper]).fit(df).transform(df)

ready = [col for col in model.columns if col != 'target']
assembler = VectorAssembler(inputCols=ready, outputCol='features')
dtc = DecisionTreeClassifier(featuresCol='features', labelCol='target')

model_2 = Pipeline(stages=[assembler, dtc])

train_data, test_data = model.randomSplit([0.5, 0.5])
fit_model = model_2.fit(train_data)
results = fit_model.transform(test_data)
results.select('features','Prediction').show()

# +--------------+----------+
# |      features|Prediction|
# +--------------+----------+
# |[1.0,15.0,8.0]|      51.0|
# |[2.0,16.0,9.0]|      51.0|
# +--------------+----------+

# USING CUSTOM VEC ASSEMBLER
new_assembler = CustomVecssembler(outputCol='features')
new_pipeline = Pipeline(stages=[column_dropper, new_assembler, dtc]).fit(train_data)
new_results = new_pipeline.transform(test_data)
new_results.select('features', 'Prediction').show()

# +--------------+----------+
# |      features|Prediction|
# +--------------+----------+
# |[1.0,15.0,8.0]|      51.0|
# |[2.0,16.0,9.0]|      51.0|
# +--------------+----------+

【讨论】:

  • @Florian 不客气!总有新东西要向 SO 学习!
  • @Scratch'N'Purr 做得很好!谢谢你。另一方面,您是否认为检查点/持久化/缓存df 用于多个操作是一个好主意,因为同一示例的类数量增加了?例如,如果我们要包含一个用于更改列名、选择特定类型的列和其他预处理步骤的类。
  • @Matthew 没问题!对于您的问题,我真的认为不需要保留df,因为管道中的后续转换步骤不会真正使用缓存的df - 它们会使用上一步转换后的数据帧。我可以看到持久化 df 的唯一价值的情况是,当您有多个管道要应用 df 时。
  • @Matthew 不幸的是,这是使用 python 编写自定义转换器的问题之一,因为转换器最初是用 Java 编写的。我还没有找到一个干净的解决方案,转换器可以从 Java 类访问属性/方法,但如果我找到解决方案,我会告诉你。
  • @Matthew 我在课堂上添加了一些其他的编辑。显然,我在尝试使用转换器时遇到了 'function' object has no attribute '_input_kwargs' 错误,并且我还使用 super(Parent, self).__init__() 初始化程序添加了对 python 2.7 的向后兼容性。有趣的是,这个解决方案现在非常hacky,因为它不仅继承自VectorAssembler,而且还包装了一个类的实例。
猜你喜欢
  • 2019-02-17
  • 2018-12-06
  • 2017-10-21
  • 2021-02-20
  • 1970-01-01
  • 1970-01-01
  • 2017-02-11
  • 1970-01-01
  • 2020-03-29
相关资源
最近更新 更多