【问题标题】:Pyspark create new column based if a column isin another Spark DataframePyspark 根据列是否在另一个 Spark Dataframe 中创建新列
【发布时间】:2021-09-21 10:32:23
【问题描述】:

如果列的行位于单独的 Dataframe 中,我正在尝试在我的 Spark Dataframe 中创建一个标志。

这是我的主要 Spark Dataframe (df_main)

+--------+
|main    |
+--------+
|28asA017|
|03G12331|
|1567L044|
|02TGasd8|
|1asd3436|
|A1234567|
|B1234567|
+--------+

这是我的参考 (df_ref),这个参考中有数百行,所以我显然不能像 solutionthis one 这样对它们进行硬编码

+--------+
|mask_vl |
+--------+
|A1234567|
|B1234567|
...
+--------+

通常,我会在 pandas 的数据框中执行以下操作:

df_main['is_inref'] = np.where(df_main['main'].isin(df_ref.mask_vl.values), "YES", "NO")

这样我就能得到这个

+--------+--------+
|main |is_inref|
+--------+--------+
|28asA017|NO      |
|03G12331|NO      |
|1567L044|NO      |
|02TGasd8|NO      |
|1asd3436|NO      |
|A1234567|YES     |
|B1234567|YES     |
+--------+--------+

我试过下面的代码,但我不明白图片中的错误是什么意思。

df_main = df_main.withColumn('is_inref', "YES" if F.col('main').isin(df_ref) else "NO")
df_main.show(20, False)

【问题讨论】:

    标签: python dataframe pyspark rdd data-wrangling


    【解决方案1】:

    你很接近。我认为您需要的额外步骤是显式创建将包含来自df_ref 的值的列表。

    请看下图:

    # Create your DataFrames
    df = spark.createDataFrame(["28asA017","03G12331","1567L044",'02TGasd8','1asd3436','A1234567','B1234567'], "string").toDF("main")
    df_ref =  spark.createDataFrame(["A1234567","B1234567"], "string").toDF("mask_vl")
    

    然后,您可以创建一个list 并使用isin,就像您拥有它一样:

    # Imports
    from pyspark.sql.functions import col, when
    
    # Create a list with the values of your reference DF
    mask_vl_list = df_ref.select("mask_vl").rdd.flatMap(lambda x: x).collect()
    
    # Use isin to check whether the values in your column exist in the list
    df_main = df_main.withColumn('is_inref', when(col('main').isin(mask_vl_list), 'YES').otherwise('NO'))
    

    这会给你:

    >>> df_main.show()
    
    +--------+--------+
    |    main|is_inref|
    +--------+--------+
    |28asA017|      NO|
    |03G12331|      NO|
    |1567L044|      NO|
    |02TGasd8|      NO|
    |1asd3436|      NO|
    |A1234567|     YES|
    |B1234567|     YES|
    +--------+--------+
    

    【讨论】:

    • 感谢@sophocles 的快速回复。我是否正确理解第一个命令(rdd.flatMap(...).collect())基本上是将数据帧转换为列表到主驱动程序节点?如果是这样,如果引用变得很大,我会不会遇到内存不足的异常?
    • 欢迎。是的,你是对的。我认为您不会遇到内存异常问题,因为这是一种有效的方法。您可以查看更多信息here,了解有关将列转换为列表的性能基准测试。
    • 我猜 collect() 不是最好的解决方案。如果 mask_v1 数据框会变大,那就是个问题了。
    【解决方案2】:

    如果你想避免收集,我建议你做下一步:

    df_ref= df_ref
              .withColumnRenamed("mask_v1", "main")
              .withColumn("isPreset", lit("yes"))
          
     main_df= main_df.join(df_ref, Seq("main"), "left_outer")
              .withColumn("is_inref", when(col("isPresent").isNull,
              lit("NO")).otherwise(lit("YES")))
    

    【讨论】:

      【解决方案3】:

      我想这个问题已经回答了,你可以在这里查看 spark detecting the unchanged rows

      【讨论】:

        猜你喜欢
        • 2023-02-01
        • 1970-01-01
        • 1970-01-01
        • 2021-06-12
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-02-14
        • 1970-01-01
        相关资源
        最近更新 更多