【问题标题】:Find dynamic intervals per group with Sparklyr使用 Sparklyr 查找每组的动态间隔
【发布时间】:2019-07-25 17:29:45
【问题描述】:

我有一个巨大的(约 100 亿行)data.frame,看起来有点像这样:

data <- data.frame(Person = c(rep("John", 9), rep("Steve", 7), rep("Jane", 4)),
Year = c(1900:1908, 1902:1908, 1905:1908),
Grade = c(c(6,3,4,4,8,5,2,9,7), c(4,3,5,5,6,4,7), c(3,7,2,9)) )

这是一组 3 个人,在不同的年份观察,我们有他们在该年份的成绩。我想创建一个变量,对于每个等级,它返回“简化等级”。简化的等级只是以不同的间隔切割的等级。 困难在于间隔因人而异。 要按人获取间隔阈值,我有以下列表:

list.threshold <- list(John = c(5,7), Steve = 4, Jane = c(3,5,8))

所以史蒂夫的成绩将被削减 2 个区间,而 Jane 的成绩将被削减 4 个区间。 这是想要的结果(SimpleGrade):

    Person  Year  Grade  SimpleGrade
1:   John   1900    6        1
2:   John   1901    3        0
3:   John   1902    4        0
4:   John   1903    4        0
5:   John   1904    8        2
6:   John   1905    5        1
7:   John   1906    2        0
8:   John   1907    9        2
9:   John   1908    7        2
10:  Steve  1902    4        1
11:  Steve  1903    3        0
12:  Steve  1904    5        1
13:  Steve  1905    5        1
14:  Steve  1906    6        1
15:  Steve  1907    4        1
16:  Steve  1908    7        1
17:  Jane   1905    3        1
18:  Jane   1906    7        2
19:  Jane   1907    2        0
20:  Jane   1908    9        3

我必须在 sparklyr 中找到解决方案,因为我正在使用一个巨大的 spark 表。

在 dplyr 我会做这样的事情:

dplyr

data <- group_by(data, Person) %>% 
mutate(SimpleGrade = cut(Grade, breaks = c(-Inf, list.threshold[[unique(Person)]], Inf), labels = FALSE, right = TRUE, include.lowest = TRUE) - 1)

它有效,但我无法在 sparklyr 中转换此解决方案,因为每个人的阈值不同。我想我将不得不使用 ft_bucketizer 函数。到目前为止,我与 sparklyr 的关系:

sparklyr

spark_tbl <- group_by(spark_tbl, Person) %>%
ft_bucketizer(input_col  = "Grade",
            output_col = "SimpleGrade",
            splits     = c(-Inf, list.threshold[["John"]], Inf))

spark_tbl 只是数据的火花表等价物。 例如,如果我不更改阈值并仅使用 John 的阈值,它就会起作用。

非常感谢,汤姆 C。

【问题讨论】:

    标签: r apache-spark dplyr sparklyr


    【解决方案1】:

    Spark ML Bucketizer 只能用于全局操作,因此对您不起作用。相反,您可以创建一个参考表

    ref <- purrr::map2(names(list.threshold), 
       list.threshold, 
       function(name, brks) purrr::map2(
         c("-Infinity", brks), c(brks, "Infinity"),
         function(low, high) list(
           name = name, 
           low = low,
           high = high))) %>%
       purrr::flatten() %>% 
       bind_rows() %>% 
       group_by(name) %>%
       arrange(low, .by_group = TRUE) %>%
       mutate(simple_grade = row_number() - 1) %>%
       copy_to(sc, .) %>%
       mutate_at(vars(one_of("low", "high")), as.numeric)
    
    # Source: spark<?> [?? x 4]
      name    low  high simple_grade
      <chr> <dbl> <dbl>        <dbl>
    1 Jane   -Inf     3            0
    2 Jane      3     5            1
    3 Jane      5     8            2
    4 Jane      8   Inf            3
    5 John   -Inf     5            0
    6 John      5     7            1
    7 John      7   Inf            2
    8 Steve  -Inf     4            0
    9 Steve     4   Inf            1
    

    然后left_join它与数据表:

    sdf <- copy_to(sc, data)
    
    simplified <- left_join(sdf, ref, by=c("Person" = "name")) %>%
      filter(Grade >= low & Grade < High) %>%
      select(-low, -high)
    simplified
    
    # Source: spark<?> [?? x 4]
       Person  Year Grade simple_grade
       <chr>  <int> <dbl>        <dbl>
     1 John    1900     6            1
     2 John    1901     3            0
     3 John    1902     4            0
     4 John    1903     4            0
     5 John    1904     8            2
     6 John    1905     5            1
     7 John    1906     2            0
     8 John    1907     9            2
     9 John    1908     7            2
    10 Steve   1902     4            1
    # … with more rows
    
    simplified %>% dbplyr::remote_query_plan()
    
    == Physical Plan ==
    *(2) Project [Person#132, Year#133, Grade#134, simple_grade#15]
    +- *(2) BroadcastHashJoin [Person#132], [name#12], Inner, BuildRight, ((Grade#134 >= low#445) && (Grade#134 < high#446))
       :- *(2) Filter (isnotnull(Grade#134) && isnotnull(Person#132))
       :  +- InMemoryTableScan [Person#132, Year#133, Grade#134], [isnotnull(Grade#134), isnotnull(Person#132)]
       :        +- InMemoryRelation [Person#132, Year#133, Grade#134], StorageLevel(disk, memory, deserialized, 1 replicas)
       :              +- Scan ExistingRDD[Person#132,Year#133,Grade#134]
       +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
          +- *(1) Project [name#12, cast(low#13 as double) AS low#445, cast(high#14 as double) AS high#446, simple_grade#15]
             +- *(1) Filter ((isnotnull(name#12) && isnotnull(cast(high#14 as double))) && isnotnull(cast(low#13 as double)))
                +- InMemoryTableScan [high#14, low#13, name#12, simple_grade#15], [isnotnull(name#12), isnotnull(cast(high#14 as double)), isnotnull(cast(low#13 as double))]
                      +- InMemoryRelation [name#12, low#13, high#14, simple_grade#15], StorageLevel(disk, memory, deserialized, 1 replicas)
                            +- Scan ExistingRDD[name#12,low#13,high#14,simple_grade#15]
    

    【讨论】:

    • 感谢您的回答。不幸的是,真实数据大约有 100 亿行。这个解决方案可以工作,但它会在 left_join 期间复制两个多行。你认为我们可以避免重复这么多行吗?
    • @TOMC 除非你省略了一些重要的细节(比如非常高的平均分割数),否则这里没有真正的问题。从物理执行计划中可以看出,过滤器被推入连接逻辑,所以重复的元组在物理上永远不会发出,只要切割是互斥的。这里唯一现实的改进是用二分搜索替换嵌套循环——但如果要带来任何实际的性能提升,这将需要 Scala 扩展。
    猜你喜欢
    • 2018-10-08
    • 1970-01-01
    • 2018-10-09
    • 1970-01-01
    • 2018-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-01-10
    相关资源
    最近更新 更多