【问题标题】:Connecting Redshift using sparklyr or sparkR?使用 sparklyr 或 sparkR 连接 Redshift?
【发布时间】:2019-07-14 14:09:29
【问题描述】:

我正在尝试了解如何使用 spark 将 R 连接到 redshift,我无法使用简单的 RPostgres 进行连接,因为该数据集很大并且需要分布式计算,

到目前为止,我能够从 s3 读取 CSV 并将其写入 spark 数据帧,有人可以说明如何配置 jar 和其他东西,以便我可以将 SparklyR(spark_read_jdbc()) 或 sparkR 连接到 redshift。

如果你能展示如何将 jars 添加到 sparkContexts 中也会很有帮助

到目前为止,我已经发现 databricks 提供了一些将 jdbc url 访问到 redshift db 所需的 jar。

【问题讨论】:

    标签: r amazon-redshift databricks sparkr sparklyr


    【解决方案1】:
    rm(list=ls())
    library(sparklyr)
    #library(SparkR)
    #detach('SparkR')
    Sys.setenv("SPARK_MEM" = "15G")
    config <- spark_config()
    config$`sparklyr.shell.driver-memory` <- "8G"
    config$`sparklyr.shell.executor-memory` <- "8G"
    config$`spark.yarn.executor.memoryOverhead` <- "6GB"
    config$`spark.dynamicAllocation.enabled`   <- "TRUE"
    config$`sparklyr.shell.driver-java-options`<-list("driver-class-path" ="/home/root/spark/spark-2.1.0-bin-hadoop2.7/jars/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar")
    spark_dir = "/tmp/spark_temp"
    config$`sparklyr.shell.driver-java-options` <-  paste0("-Djava.io.tmpdir=", spark_dir)
    sc <- spark_connect(master = "local[*]", config = config)
    #sc <- spark_connect(master = "local")
    
    ###invoke the spark context 
    ctx <- sparklyr::spark_context(sc)
    #Use below to set the java spark context ##"org.apache.spark.api.java.JavaSparkContext"
    ####
    jsc <- sparklyr::invoke_static( sc, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext",ctx )
    ##invoke the hadoop context 
    hconf <- jsc %>% sparklyr::invoke("hadoopConfiguration")
    #hconf %>%    invoke("set","fs.s3a.access.key","<your access key for s3 >")  
    
    hconf %>%    sparklyr::invoke("set","fs.s3a.access.key","<your access key for s3>")  
    hconf %>% sparklyr::invoke("set","fs.s3a.secret.key", "<your secret key for s3>")   
    hconf%>% sparklyr::invoke("set","fs.s3a.endpoint", "<your region of s3 bucket>") 
    
    hconf %>% sparklyr::invoke("set","com.amazonaws.services.s3.enableV4", "true") 
    hconf %>% sparklyr::invoke("set","spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
    
    
    hconf %>% sparklyr::invoke("set","fs.s3a.impl.disable.cache", "true") 
    
    
    
    ?spark_read_csv
    
    ###reading from s3 buckets 
    spark_read_csv(sc=sc,name='sr',path="s3a://my-bucket/tmp/2district.csv",memory = TRUE)
    spark_read_csv(sc=sc,name='sr_disk3',path="s3a://my-bucket/tmp/changed/",memory = FALSE)
    ###reading from local drive 
    spark_read_csv(sc=sc,name='raw_data_loc_in3',path="/tmp/distance.csv",memory = TRUE)
    spark_read_csv(sc=sc,name='raw_data_loc_in5',path="/tmp/distance.csv",memory = TRUE)
    
    
    
    
    
    
    
    
    
    ####reading from redshift table 
    t<-sparklyr::spark_read_jdbc(sc, "connection",  options = list(
      url = "jdbc:redshift://<URL>:<Port>/<dbName>",
      user = "<user_name>",
      password = "<password>",
      dbtable='(Select * from sales limit 1000)',
      tempS3Dir = "s3a://my-bucket/migration"),memory = T,overwrite = T,repartition = 3)
    
    ####write rdd to csv in local
    sparklyr::spark_write_csv(t,path='sample.csv')
    ####write rdd to csv in local
    sparklyr::spark_write_csv(t,path='s3a://my-bucket/output/')
    

    【讨论】:

    • 我自己想出来的,我把答案放在上面,以便其他人可以轻松找到它,如果您需要有关罐子的帮助,请在此处评论
    猜你喜欢
    • 2019-09-20
    • 2017-03-27
    • 2017-02-11
    • 2017-01-22
    • 2018-06-13
    • 2021-02-18
    • 2018-03-24
    • 2018-02-24
    • 1970-01-01
    相关资源
    最近更新 更多