【问题标题】:flatMap doesn't preserve order when creating lists from pyspark dataframe columns从 pyspark 数据框列创建列表时,flatMap 不保留顺序
【发布时间】:2018-01-25 09:30:43
【问题描述】:

我有一个 PySpark 数据框 df:

+---------+------------------+
|ceil_temp|             test2|
+---------+------------------+
|       -1|[6397024, 6425417]|
|        0|[6397024, 6425417]|
|        0|[6397024, 6425417]|
|        0|[6469640, 6531963]|
|        0|[6469640, 6531963]|
|        1|[6469640, 6531963]|
+---------+------------------+

我最终想根据 ceil_temp 列的索引向该数据帧添加一个新列(最终),其值是 test2 列中列表的元素。例如:如果 ceil_temp 列中有

+---------+------------------+--------
|ceil_temp|             test2|final  |
+---------+------------------+--------
|       -1|[6397024, 6425417]|6397024|
|        0|[6397024, 6425417]|6397024|
|        0|[6397024, 6425417]|6397024|
|        0|[6469640, 6531963]|6469640|
|        0|[6469640, 6531963]|6469640|
|        1|[6469640, 6531963]|6531963|
+---------+------------------+--------

为此,我尝试使用 flatMap 将 ceil_temp 和 test2 提取为列表:

m =df.select("ceil_temp").rdd.flatMap(lambda x: x).collect()
q= df.select("test2").rdd.flatMap(lambda x: x).collect()

l=[]
for i in range(len(num)):
    if m[i]<0:
        m[i]=0
    else:
        pass
    l.append(q[i][m[i]])

然后将此列表 l 转换为新的 df 并将其与基于我基于窗口函数添加的行索引列的原始数据框连接:

w = Window().orderBy()
df=df.withColumn("columnindex", rowNumber().over(w)).

但是,flatMap 提取的列表的顺序似乎与父数据框 df 的顺序不同。我得到以下信息:

m=[-1,0,0,0,0,1]
q=[[6469640, 6531963],[6469640, 6531963],[6469640, 6531963],[6397024, 6425417],[6397024, 6425417],[6397024, 6425417]]

预期结果:

m=[-1,0,0,0,0,1]
q=[[6397024, 6425417],[6397024, 6425417],[6397024, 6425417],[6469640, 6531963],[6469640, 6531963],[6469640, 6531963]]

请告知如何实现“最终”列。

【问题讨论】:

    标签: python apache-spark pyspark spark-dataframe apache-spark-1.6


    【解决方案1】:

    我认为您可以在数据框的行上使用 UDF 来实现您想要的结果。

    然后您可以 withColumn 使用您的 udf 的结果。

    val df = spark.sparkContext.parallelize(List(
      (-1, List(6397024, 6425417)),
      (0,List(6397024, 6425417)),
      (0,List(6397024, 6425417)),
      (0,List(6469640, 6531963)),
      (0,List(6469640, 6531963)),
      (1,List(6469640, 6531963)))).toDF("ceil_temp", "test2")
    
    import org.apache.spark.sql.functions.udf
    val selectRightElement = udf {
      (ceilTemp: Int, test2: Seq[Int]) => {
        // dummy code for the example
        if (ceilTemp <= 0) test2(0) else test2(1)
      }
    }
    
    df.withColumn("final", selectRightElement(df("ceil_temp"), df("test2"))).show
    

    这样做可以防止你的行顺序被打乱。

    【讨论】:

    • 谢谢!我对scala语法不太熟悉,但是我如何遍历包含(整数)数组的列并在python中返回特定元素? (这部分: if (ceilTemp
    • 任何关于 python 的好的参考都应该可以帮助你。它超出了有关 spark 数据帧的原始问题的范围。
    【解决方案2】:

    我通过以下方式解决了上述问题:

    df=df.withColumn("final",(df.test2).getItem(df.ceil_temp))
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-06-21
      • 1970-01-01
      • 1970-01-01
      • 2019-03-16
      • 2021-02-09
      • 2016-09-25
      • 2022-01-03
      相关资源
      最近更新 更多