【问题标题】:R - How to replicate rows in a spark dataframe using sparklyrR - 如何使用 sparklyr 复制火花数据框中的行
【发布时间】:2017-11-15 18:24:44
【问题描述】:

有没有办法使用 sparklyr/dplyr 的函数来复制 Spark 数据帧的行?

sc <- spark_connect(master = "spark://####:7077")

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

这是所需的输出,保存到新的 spark tbl 中:

> df2_tbl
   row1  row2
  <int> <chr>
1     1     A
2     1     A
3     1     A
4     2     B
5     2     B
6     2     B
7     3     C
8     3     C
9     3     C

【问题讨论】:

    标签: r apache-spark sparklyr


    【解决方案1】:

    首先想到的想法是使用explode 函数(这正是它在Spark 中的含义)。然而,SparkR 似乎不支持数组(据我所知)。

    > structField("a", "array")
    Error in checkType(type) : Unsupported type for SparkDataframe: array
    

    不过,我可以提出另外两种方法:

    1. 一个简单但不是很优雅的:

      head(rbind(df, df, df), n=30)
      #    row1 row2
      # 1    1    A
      # 2    2    B
      # 3    3    C
      # 4    1    A
      # 5    2    B
      # 6    3    C
      # 7    1    A
      # 8    2    B
      # 9    3    C
      

      或者使用 for 循环以获得更多通用性:

      df2 = df
      for(i in 1:2) df2=rbind(df, df2)
      

      请注意,这也适用于 union

    2. 第二种更优雅的方法(因为它只意味着一次 spark 操作)基于交叉连接(笛卡尔积),数据帧大小为 3(或任何其他数字):

      j <- as.DataFrame(data.frame(s=1:3))
      head(drop(crossJoin(df, j), "s"), n=100)
      #    row1 row2
      # 1    1    A
      # 2    1    A
      # 3    1    A
      # 4    2    B
      # 5    2    B
      # 6    2    B
      # 7    3    C
      # 8    3    C
      # 9    3    C
      

    【讨论】:

    • 应该是array&lt;type&gt; 而不是array 例如structField("a", "array&lt;string&gt;")
    【解决方案2】:

    我不知道 R 的 repfunction 的集群端版本。但是,我们可以使用连接来模拟它的集群端。

    df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")
    
    replyr <- function(data, n, sc){
      joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE)
    
      data %>%
        mutate(joiner_index = 1) %>%
        left_join(joiner_frame) %>%
        select(-joiner_index)
    
    }
    
    df_tbl2 <- replyr(df_tbl, 3, sc)
    #    row1 row2 
    #    <int> <chr>
    # 1     1 A    
    # 2     1 A    
    # 3     1 A    
    # 4     2 B    
    # 5     2 B    
    # 6     2 B    
    # 7     3 C    
    # 8     3 C    
    # 9     3 C  
    

    它完成了工作,但它有点脏,因为tmp_joining_frame 将持续存在。鉴于对函数的多次调用的惰性评估,我不确定这将如何工作。

    【讨论】:

      【解决方案3】:

      使用sparklyr,您可以按照@Oli 的建议使用arrayexplode

      df_tbl %>% 
        mutate(arr = explode(array(1, 1, 1))) %>% 
        select(-arr)
      
      # # Source:   lazy query [?? x 2]
      # # Database: spark_connection
      #    row1 row2 
      #   <int> <chr>
      # 1     1 A    
      # 2     1 A    
      # 3     1 A    
      # 4     2 B    
      # 5     2 B    
      # 6     2 B    
      # 7     3 C    
      # 8     3 C    
      # 9     3 C    
      

      和广义的

      library(rlang)
      
      df_tbl %>%  
        mutate(arr = !!rlang::parse_quo(
          paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
        )) %>% select(-arr)
      
      # # Source:   lazy query [?? x 2]
      # # Database: spark_connection
      #    row1 row2 
      #   <int> <chr>
      # 1     1 A    
      # 2     1 A    
      # 3     1 A    
      # 4     2 B    
      # 5     2 B    
      # 6     2 B    
      # 7     3 C    
      # 8     3 C    
      # 9     3 C   
      

      您可以轻松调整行数。

      【讨论】:

      • 顺便说一句,对于较新版本的 rlang,必须将环境传递给 parse_quo,因此您可以使用 parse_quo(paste(...), env = sc),其中 sc 是 Spark 上下文 :)
      猜你喜欢
      • 2021-01-08
      • 2018-12-19
      • 1970-01-01
      • 2018-10-30
      • 2020-05-24
      • 2020-03-05
      • 1970-01-01
      • 2020-04-21
      • 2017-03-16
      相关资源
      最近更新 更多