【问题标题】:Scala to fetch the details from the oracle table and fire the query in hive tableScala 从 oracle 表中获取详细信息并在 hive 表中触发查询
【发布时间】:2020-09-09 04:39:28
【问题描述】:

我正在尝试连接到 oracle 数据库,为此我必须使用纯 scala 连接而不是 spark 连接。所以我写了oracle数据库的连接代码。

现在主要的头疼是我在 oracle 表中有一个列,它为每一行编写了一个选择查询(该表有一些元数据)。 我需要获取写在每一列中的查询并触发它将在 hive 表上并将查询结果存储在数据框中。我不确定用什么方法来解决上述问题。

Oracle 表数据

我能够使用纯 scala 连接连接到 oracle 表。我需要获取查询 col 数据并触发它。我需要将查询结果存储在数据框中以供进一步处理。

连接代码:-

    object ScalaJdbcConnectSelect {

  def main(args: Array[String]) {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/mysql"
    val username = "root"
    val password = "root"

    
    var connection:Connection = null

    try {
      // make the connection
      Class.`enter code here`forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // create the statement, and run the select query
      val statement = connection.createStatement()
      val resultSet = statement.executeQuery(query)`

【问题讨论】:

  • 向我们展示您的连接代码?你从你的连接/查询中得到了什么? java.sql.ResultSet?
  • 我已经在帖子中添加了连接代码。我正在从 statment.executequery 返回 java 结果集。

标签: database scala apache-spark


【解决方案1】:

你可以从 Spark 连接到 Oracle,你应该这样做 that,否则你将不得不在使用 Spark 读取数据之前将所有数据按顺序放入某个中间存储,这与拥有多个连接相比非常浪费到数据库并行检索数据。

// Set the variables server, port, service
val url = s"jdbc:oracle:thin:@$server:$port:$service"

// Add odbc6.jar via --driver-class-path and --jars during spark-shell/submit 
val reader = spark.read.format("jdbc")
  .option("url", url)
  .option("user", user)
  .option("password", password)
  .option("driver", "oracle.jdbc.driver.OracleDriver")

// Note the use of partitionColumn is necessary to create multiple connections 
// from the workers
val df = reader.option("dbtable", "db.table")
               .option("partitionColumn", "col1")
               .load
val dfWithQuery = reader.option("dbtable", "(SELECT a, b, c FROM t1) AS tbl1")
                  .option("partitionColumn", "a")
                  .load

你首先得到你的结果df然后可以收集查询列,循环并将收集的查询添加到reader以创建一个新的DataFrame

如果你坚持使用你的方法,那么你已经有了ResultSet2, 。您只需在遍历行时获取您的列,然后将查询与您的 Connection 一起使用。

// ^ Your code above, Connection already exists
while (resultSet.next()){
  try {
    val queryValue = resultSet.getString("query")
    val queryResultSet = statement.executeQuery(queryValue)
    while (queryResultSet){
      // Do stuff with your newly queried ResultSet
    }
  }
}

您可以看到多级try catch 不是很漂亮。

如果您愿意,您可以从ResultSet 创建一个Dataset,但请注意,这需要您首先将整个RestulSet 加载到驱动程序的内存中,如果数据很大,您可能会用完。

def resultSetToSpark[T](rs: ResultSet, f: ResultSet => T, 
                        spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
  val data: Seq[T] = Iterator.continually(rs.next, rs)
    .takeWhile(_._1).map{
      case (_,rs) => f(rs)
    }.toList

  spark.createDataset(data)(encoder)
}

注意,您必须提供一个函数,该函数从ResultSet 获取单行并从检索到的值中实例化class,以及您的类的编码器。下面是一个例子。

case class Potato(a: String, b: String, c: String)

def parseResultSet(rs: ResultSet): Potato = Potato(
 rs.getString("a"), rs.getString("b"), rs.getString("c")
)

import org.apache.spark.sql.{Encoder, Encoders}
val encoder: Encoder[Potato] =  Encoders.product[Potato]

// Use like this
val dfFromRS = resultSetToSpark(resultSet, parseResultSet, spark, encoder)

因此,最好的方法确实是使用 Spark 的 DataFrameReader 并让 Spark 工作人员与数据库建立多个连接,但这仅在相关数据可能导致驱动程序内存不足时才有意义。


您总是可以像这样获取所有查询的列表。

case class Query(statement: String)
implicit val queryEncoder: Encoder[Query] = Encoders.product[Query]
val queryDs = resultSetToSpark(rs)(rs => Message(rs.getString("query")))(spark, queryEncoder)
val queryList = queryDs.collect.toList
val df1 = spark.sql(queryList.head) // get a DataFrame from the first query in the list

【讨论】:

  • 感谢您的详细回复。查询 col 有 sql 语句,我需要在 hive 表上运行..我怎样才能做到这一点。对于与 oracle 的连接,我使用的是纯 scala 连接而不是 spark 连接。
  • @Saurabh 那么当您访问 Hive 表时,您想对它们做什么?
  • 我的问题陈述是我在 col_1 中有一个 oracle 表有查询。需要在 hive 表上触发查询,并且我从 hive 表中获得的查询结果需要存储在要触发的每个查询的数据框中。
  • 我连接到 oracle 表并从 resultset.Net 中的表中获取数据我需要在 hive 表上触发查询
  • 也许我把这件事复杂化了。你知道你可以使用spark.sql(query) 来获得DataFrame 吗?或者在对它们进行操作之前,您是否想要一份所有查询的列表?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-22
  • 1970-01-01
  • 2013-11-07
  • 2012-06-02
  • 1970-01-01
相关资源
最近更新 更多