【问题标题】:E-num / get Dummies in pysparkE-num / 在 pyspark 中获取 Dummies
【发布时间】:2017-03-15 09:26:35
【问题描述】:

我想在 PYSPARK 中创建一个函数来获取数据框和参数列表(代码/分类特征)并返回带有额外虚拟列的数据框,例如列表中特征的类别 PFA DF 之前和之后: before and After data frame- Example

python中的代码是这样的:

enum = ['column1','column2']

for e in enum:
    print e
    temp = pd.get_dummies(data[e],drop_first=True,prefix=e)
    data = pd.concat([data,temp], axis=1)
    data.drop(e,axis=1,inplace=True)

data.to_csv('enum_data.csv')

【问题讨论】:

    标签: pyspark pyspark-sql


    【解决方案1】:

    首先,您需要收集 TYPESCODE 的不同值。然后使用withColumn 选择添加每个值的名称的列,或者使用每个列的选择。 下面是使用 select 语句的示例代码:-

    import pyspark.sql.functions as F
    df = sqlContext.createDataFrame([
        (1, "A", "X1"),
        (2, "B", "X2"),
        (3, "B", "X3"),
        (1, "B", "X3"),
        (2, "C", "X2"),
        (3, "C", "X2"),
        (1, "C", "X1"),
        (1, "B", "X1"),
    ], ["ID", "TYPE", "CODE"])
    
    types = df.select("TYPE").distinct().rdd.flatMap(lambda x: x).collect()
    codes = df.select("CODE").distinct().rdd.flatMap(lambda x: x).collect()
    types_expr = [F.when(F.col("TYPE") == ty, 1).otherwise(0).alias("e_TYPE_" + ty) for ty in types]
    codes_expr = [F.when(F.col("CODE") == code, 1).otherwise(0).alias("e_CODE_" + code) for code in codes]
    df = df.select("ID", "TYPE", "CODE", *types_expr+codes_expr)
    df.show()
    

    输出

    +---+----+----+--------+--------+--------+---------+---------+---------+
    | ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
    +---+----+----+--------+--------+--------+---------+---------+---------+
    |  1|   A|  X1|       1|       0|       0|        1|        0|        0|
    |  2|   B|  X2|       0|       1|       0|        0|        1|        0|
    |  3|   B|  X3|       0|       1|       0|        0|        0|        1|
    |  1|   B|  X3|       0|       1|       0|        0|        0|        1|
    |  2|   C|  X2|       0|       0|       1|        0|        1|        0|
    |  3|   C|  X2|       0|       0|       1|        0|        1|        0|
    |  1|   C|  X1|       0|       0|       1|        1|        0|        0|
    |  1|   B|  X1|       0|       1|       0|        1|        0|        0|
    +---+----+----+--------+--------+--------+---------+---------+---------+
    

    【讨论】:

      【解决方案2】:

      Freek Wiemkeijer 和 Rakesh Kumar 提供的解决方案非常合适,但是,由于我已对其进行了编码,因此我认为值得发布此通用解决方案,因为它不需要对列名进行硬编码。

      pivot_cols = ['TYPE','CODE']
      keys = ['ID','TYPE','CODE']
      
      before = sc.parallelize([(1,'A','X1'),
                               (2,'B','X2'),
                               (3,'B','X3'),
                               (1,'B','X3'),
                               (2,'C','X2'),
                               (3,'C','X2'),
                               (1,'C','X1'),
                               (1,'B','X1')]).toDF(['ID','TYPE','CODE'])                         
      
      #Helper function to recursively join a list of dataframes
      #Can be simplified if you only need two columns
      def join_all(dfs,keys):
          if len(dfs) > 1:
              return dfs[0].join(join_all(dfs[1:],keys), on = keys, how = 'inner')
          else:
              return dfs[0]
      
      dfs = []
      combined = []
      for pivot_col in pivot_cols:
          pivotDF = before.groupBy(keys).pivot(pivot_col).count()
          new_names = pivotDF.columns[:len(keys)] +  ["e_{0}_{1}".format(pivot_col, c) for c in pivotDF.columns[len(keys):]]        
          df = pivotDF.toDF(*new_names).fillna(0)    
          combined.append(df)
      
      join_all(combined,keys).show()
      

      这作为输出:

      +---+----+----+--------+--------+--------+---------+---------+---------+
      | ID|TYPE|CODE|e_TYPE_A|e_TYPE_B|e_TYPE_C|e_CODE_X1|e_CODE_X2|e_CODE_X3|
      +---+----+----+--------+--------+--------+---------+---------+---------+
      |  1|   A|  X1|       1|       0|       0|        1|        0|        0|
      |  2|   C|  X2|       0|       0|       1|        0|        1|        0|
      |  3|   B|  X3|       0|       1|       0|        0|        0|        1|
      |  2|   B|  X2|       0|       1|       0|        0|        1|        0|
      |  3|   C|  X2|       0|       0|       1|        0|        1|        0|
      |  1|   B|  X3|       0|       1|       0|        0|        0|        1|
      |  1|   B|  X1|       0|       1|       0|        1|        0|        0|
      |  1|   C|  X1|       0|       0|       1|        1|        0|        0|
      +---+----+----+--------+--------+--------+---------+---------+---------+
      

      【讨论】:

      • 我正在尝试在大型数据帧上运行此代码,并且需要很长时间。我是 spark 的超级新手,您认为此代码不适合大型数据集吗?如果是,有没有办法提高性能。
      • 您尝试过 Kumars 解决方案吗?如果你有很多列,这个解决方案不会特别快。
      • 我需要能够在没有硬编码的情况下在很多列上执行此操作,这就是我使用您的解决方案的原因,有没有办法在没有硬编码的情况下运行 Kumars 解决方案?
      • 当我尝试运行 df.show() 时,它没有显示所有的虚拟列
      【解决方案3】:

      我一直在寻找相同的解决方案,但是是 scala,也许这会对某人有所帮助:

      val list = df.select("category").distinct().rdd.map(r => r(0)).collect()
      val oneHotDf = list.foldLeft(df)((df, category) => finalDf.withColumn("category_" + category, when(col("category") === category, 1).otherwise(0)))
      

      【讨论】:

      • (问题是关于python的。这篇文章似乎没有为问题提供quality answer)。
      • 我认为,如果有人分享知识,而您唯一能做的就是从 ysegals 的回答中获利,这总是双赢的。我不喜欢人们因为他人认为他们必须严格遵守某些规则集(无论上下文或情况如何)而对他们的工作感到气馁。
      【解决方案4】:

      如果你想获取PySpark版本的pandas“pd.get_dummies”函数,你可以使用以下函数:

      import itertools
      
      def spark_get_dummies(df):
          
          categories = []
          for i, values in enumerate(df.columns):
              categories.append(df.select(values).distinct().rdd.flatMap(lambda x: x).collect())
              
          expressions = []
          for i, values in enumerate(df.columns):
              expressions.append([F.when(F.col(values) == i, 1).otherwise(0).alias(str(values) + "_" + str(i)) for i in categories[i]])
          
          expressions_flat = list(itertools.chain.from_iterable(expressions))
          
          df_final = df.select(*expressions_flat)
          
          return df_final
      

      可重现的例子是:

      df = sqlContext.createDataFrame([
          ("A", "X1"),
          ("B", "X2"),
          ("B", "X3"),
          ("B", "X3"),
          ("C", "X2"),
          ("C", "X2"),
          ("C", "X1"),
          ("B", "X1"),
      ], ["TYPE", "CODE"])
      
      dummies_df = spark_get_dummies(df)
      dummies_df.show()
      

      你会得到:

      【讨论】:

        【解决方案5】:

        第一步是从您的 CSV 文件中创建一个DataFrame

        Get CSV to Spark dataframe;第一个答案是逐行举例。

        然后您可以添加列。假设您有一个名为 dfDataFrame 对象,列是:[ID, TYPE, CODE]。

        其余的货车用DataFrame.withColumn()pyspark.sql.functions.when修复:

        from pyspark.sql.functions import when
        
        df_with_extra_columns = df.withColumn("e_TYPE_A", when(df.TYPE == "A", 1).otherwise(0).withColumn("e_TYPE_B", when(df.TYPE == "B", 1).otherwise(0)
        

        (这会添加前两列。你明白了。)

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2017-09-09
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-05-25
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多