【问题标题】:Spark read JDBC from SAS IOMSpark 从 SAS IOM 读取 JDBC
【发布时间】:2020-07-17 23:27:16
【问题描述】:

我正在尝试使用 Spark JDBC 从 SAS IOM 中读取数据。问题是SAS JDBC驱动有点奇怪,所以我需要创建自己的方言:

object SasDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")
  override def quoteIdentifier(colName: String): String = "\"" + colName + "\"n"
}

然而,这还不够。 SAS区分列标签(=人类可读的名称)和列名称(=您在SQL查询中使用的名称),但似乎spark使用列标签而不是模式发现中的名称,请参阅下面的JdbcUtils提取:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L293

while (i < ncols) {
  val columnName = rsmd.getColumnLabel(i + 1)

这会导致 SQL 错误,因为它试图在生成的 SQL 代码中使用人类可读的列名。

要使 SAS IOM JDBC 正常工作,这需要是 getColumnName 而不是 getColumnLabel。有没有办法在方言中指定这个?除了包装整个 com.sas.rio.MVADriver 和 resultsetmeta 之外,我真的找不到方法来解决这个问题

弗兰克

【问题讨论】:

    标签: apache-spark sas


    【解决方案1】:

    与此同时,我发现了如何做到这一点,所以只是发布以供参考。诀窍是注册自己的方言,如下所示。

    另外,SAS 用空格填充所有 varchar 列,所以我修剪所有字符串列。

      def getSasTable(sparkSession: org.apache.spark.sql.SparkSession, tablename: String): org.apache.spark.sql.DataFrame = {                                       
        val host : String = "dwhid94.msnet.railb.be";                                                                                                               
        val port : String = "48593";                                                                                                                                
        val props = new java.util.Properties();                                                                                                                     
        props.put("user", CredentialsStore.getUsername("sas"))                                                                                                      
        props.put("password", CredentialsStore.getPassword("sas"))                                                                                                  
        props.setProperty("driver", "com.sas.rio.MVADriver")                                                                                                        
        val sasconurl : String =  String.format("jdbc:sasiom://%s:%s", host, port);                                                                                 
                                                                                                                                                                    
        object SasDialect extends JdbcDialect {                                                                                                                     
          override def canHandle(url: String): Boolean = url.startsWith("jdbc:sasiom")                                                                              
          override def quoteIdentifier(colName: String): String = "\"" + colName + "\"n"                                                                            
        }                                                                                                                                                           
        JdbcDialects.registerDialect(SasDialect)                                                                                                                    
        val df = sparkSession.read                                                                                                                                  
          .option("url", sasconurl)                                                                                                                                 
          .option("driver", "com.sas.rio.MVADriver")                                                                                                                
          .option("dbtable", tablename)                                                                                                                             
          .option("user",CredentialsStore.getUsername("sas"))                                                                                                       
          .option("password",CredentialsStore.getPassword("sas"))                                                                                                   
          .option("fetchsize",100)                                                                                                                                  
          .format("jdbc")                                                                                                                                           
          .load()                                                                                                                                                   
                                                                                                                                                                    
        val strippedDf = sparkSession.createDataFrame(df.rdd.map(r => Row(r.toSeq.map(x => x match {case s: String => s.trim; case _ => x}): _*)), df.schema);      
        return strippedDf;                                                                                                                                          
      }                                                                                                                                                             
                                                                                                                                                                    
    

    【讨论】:

    • 请注意,SAS 数据集没有varchar 变量。只有固定长度的字符串和浮点数。这就是为什么如果您想在没有试用空间的情况下存储为 varchar 时需要修剪这些值。
    • 我也有同样的问题。我想通过 JDBC 从 SAS 加载数据。方法getColumnLabel 返回所有列的空字符串。不幸的是,在这种情况下,自定义方言没有帮助。你遇到过这样的问题吗?我很高兴听到有关如何应对这个麻烦的任何想法
    猜你喜欢
    • 2017-09-17
    • 1970-01-01
    • 1970-01-01
    • 2020-03-08
    • 2017-10-01
    • 1970-01-01
    • 1970-01-01
    • 2017-08-26
    • 2018-07-18
    相关资源
    最近更新 更多