【问题标题】:How to add serial number for each group in Spark DataFrame如何在 Spark DataFrame 中为每个组添加序列号
【发布时间】:2018-11-13 12:47:02
【问题描述】:

我正在尝试根据以下条件向我的 Spark DataFrame 添加一个带有序列号的列:我想为其中一列中的每个组分配顺序整数。

我尝试了很多方法,但似乎没有任何效果。

df<-data.frame(location=c("a","a","d","d","d"),
               device_id=c(123,3455,234,4565,675),
               expected_column=c(1,2,1,2,3))

#final_data_visitor is my spark Data Frame 
random_data<-final_data_visitor

random_data<-random_data %>%
       group_by(location_id,device_subscriber_id)%>%                
       mutate(visit_seq=1:n())

random_data
# error is "Error in from:to : NA/NaN argument
In addition: Warning message:
In 1:n() : NAs introduced by coercion"

这是我尝试执行代码时遇到的错误:

"from:to 中的错误:NA/NaN 参数 另外:警告信息: 在 1:n() 中:强制引入的 NAs

【问题讨论】:

    标签: r apache-spark dplyr apache-spark-sql sparklyr


    【解决方案1】:

    您可以使用row_number 窗口函数,但要能够应用它,您需要某种形式的排序。例如,如果数据定义如下

    set.seed(1)
    df <- copy_to(sc, tibble(group=rep(c("a", "b"), 3), value=runif(6)))
    

    你可以

    df %>% 
      group_by(group) %>% 
      arrange(value, .by_group=TRUE) %>%  
      mutate(r = row_number())
    
    # Source:     lazy query [?? x 3]
    # Database:   spark_connection
    # Groups:     group
    # Ordered by: value, TRUE
      group value     r
      <chr> <dbl> <int>
    1 b     0.372     1
    2 b     0.898     2
    3 b     0.908     3
    4 a     0.202     1
    5 a     0.266     2
    6 a     0.573     3
    

    如果没有预定义的排序,您可以尝试使用montonically_increasing_id 添加一个(请确保您首先了解 Spark 的排序语义),或者,如果您不关心用于分组的同一列的顺序:

    df %>% 
      group_by(group) %>% 
      arrange(group, .by_group=TRUE) %>%
      mutate(r = row_number())
    
    # Source:     lazy query [?? x 3]
    # Database:   spark_connection
    # Groups:     group
    # Ordered by: group, TRUE
      group value     r
      <chr> <dbl> <int>
    1 a     0.266     1
    2 a     0.573     2
    3 a     0.202     3
    4 b     0.372     1
    5 b     0.908     2
    6 b     0.898     3
    

    如果以这种方式应用,组中值的顺序将是不确定的。

    【讨论】:

    • 我收到一个错误“错误:org.apache.spark.sql.AnalysisException: cannot resolve 'value' given input columns:”
    • @YogeshKumar 显然列名应该反映您的实际数据。答案包含最小的可重现数据集,要在您自己的输入上运行它,您必须相应地调整它。
    猜你喜欢
    • 2021-06-11
    • 2015-12-23
    • 1970-01-01
    • 2020-02-07
    • 2016-08-27
    • 2016-01-07
    相关资源
    最近更新 更多