【问题标题】:Fill null values in a row with frequency of other column用其他列的频率填充一行中的空值
【发布时间】:2021-11-01 21:52:17
【问题描述】:

在 spark 结构化流上下文中,我有这个数据框:

+------+----------+---------+
|brand |Timestamp |frequency|
+------+----------+---------+
|BR1   |1632899456|4        |
|BR1   |1632901256|4        |
|BR300 |1632901796|null     |
|BR300 |1632899155|null     |
|BR90  |1632901743|1        |
|BR1   |1632899933|4        |
|BR1   |1632899756|4        |
|BR22  |1632900776|null     |
|BR22  |1632900176|null     |
+------+----------+---------+

我想用批次中品牌的频率替换空值,以获得这样的数据框:

+------+----------+---------+
|brand |Timestamp |frequency|
+------+----------+---------+
|BR1   |1632899456|4        |
|BR1   |1632901256|4        |
|BR300 |1632901796|2        | 
|BR300 |1632899155|2        |
|BR90  |1632901743|1        |
|BR1   |1632899933|4        |
|BR1   |1632899756|4        |
|BR22  |1632900776|2        |
|BR22  |1632900176|2        |
+------+----------+---------+

我使用的是 Spark 版本 2.4.3 和 SQLContext,以及 scala 语言。

【问题讨论】:

    标签: scala apache-spark spark-streaming spark-structured-streaming


    【解决方案1】:

    嗨,兄弟,我是一名 Java 程序员。最好通过 freq 列循环并搜索第一个 null 及其相关品牌。所以计算直到表末尾的数量并更正该品牌的空值,然后选择另一个空品牌并更正它。这是我的 java 解决方案 :(我没有测试这段代码只是编写了文本编辑器,但我希望运行良好,70%;)

        //this is your table  +  dimensions
        table[9][3];    
        int repeatCounter = 0;
        String brand;
        boolean thereIsNull = true;
        //define an array to save the address of the specified null brand
        int[tablecolumns.length()] brandmemory; 
        while (thereisnull) {
            for (int i = 0; i < tablecolumns.length(); i++) {
                
                if (array[i][3] == null) {
                     thereIsNull = true;
                    brand = array[i][1];
                    for (int n = i; n < tablecolumns.length(); i++) {
                        if (brand == array[i][1]) {
                            repeatCounter++;
                             // making an array to save address of  the null brand in table:
                            brandmemory[repeatCounter] = i;
                            else{
                                break ;
                            }
                        }
                        for (int p = 1; p = repeatCounter ; p++) {
                            //changing null values to number of repeats 
                            array[brandmemory[p]][3] = repeatCounter;
                        }
                    }
                }
                else{
                    continue;
                    //check if the table has any null content if no :end of program. 
                    for(int w>i ; w=tablecolumns.length();w++ ){
                        if(array[w] != null  ){
                            thereIsNull = false;
                            else{ thereIsNull = true;
                            break;
                            
                        }
                    }
                }
            }
        }
    

    【讨论】:

      【解决方案2】:

      在窗口功能上使用“计数”:

      val df = Seq(
        ("BR1", 1632899456, Some(4)),
        ("BR1", 1632901256, Some(4)),
        ("BR300", 1632901796, None),
        ("BR300", 1632899155, None),
        ("BR90", 1632901743, Some(1)),
        ("BR1", 1632899933, Some(4)),
        ("BR1", 1632899756, Some(4)),
        ("BR22", 1632900776, None),
        ("BR22", 1632900176, None)
      ).toDF("brand", "Timestamp", "frequency")
      
      val brandWindow = Window.partitionBy("brand")
      val result = df.withColumn("frequency", when($"frequency".isNotNull, $"frequency").otherwise(count($"brand").over(brandWindow)))
      

      结果:

      +-----+----------+---------+
      |BR1  |1632899456|4        |
      |BR1  |1632901256|4        |
      |BR1  |1632899933|4        |
      |BR1  |1632899756|4        |
      |BR22 |1632900776|2        |
      |BR22 |1632900176|2        |
      |BR300|1632901796|2        |
      |BR300|1632899155|2        |
      |BR90 |1632901743|1        |
      +-----+----------+---------+
      

      GroupBy 的解决方案:

      val countDF = df.select("brand").groupBy("brand").count()
      
      
      df.alias("df")
        .join(countDF.alias("cnt"), Seq("brand"))
        .withColumn("frequency", when($"df.frequency".isNotNull, $"df.frequency").otherwise($"cnt.count"))
        .select("df.brand", "df.Timestamp", "frequency")
      

      【讨论】:

      • 似乎不支持 Window.partitionBy 是 spark 结构化流上下文。我收到此消息:org.apache.spark.sql.AnalysisException:流数据帧/数据集不支持非基于时间的窗口;;
      • 是的,这不适用于流媒体,我将删除答案
      • 流式上下文有什么解决方案吗?
      • 已编辑答案,添加了“GroupBy”的解决方案。
      猜你喜欢
      • 1970-01-01
      • 2023-02-04
      • 1970-01-01
      • 2021-08-02
      • 1970-01-01
      • 2020-05-14
      • 1970-01-01
      • 2017-07-12
      • 2022-01-14
      相关资源
      最近更新 更多