你可以从 Spark 连接到 Oracle,你应该这样做 that,否则你将不得不在使用 Spark 读取数据之前将所有数据按顺序放入某个中间存储,这与拥有多个连接相比非常浪费到数据库并行检索数据。
// Set the variables server, port, service
val url = s"jdbc:oracle:thin:@$server:$port:$service"
// Add odbc6.jar via --driver-class-path and --jars during spark-shell/submit
val reader = spark.read.format("jdbc")
.option("url", url)
.option("user", user)
.option("password", password)
.option("driver", "oracle.jdbc.driver.OracleDriver")
// Note the use of partitionColumn is necessary to create multiple connections
// from the workers
val df = reader.option("dbtable", "db.table")
.option("partitionColumn", "col1")
.load
val dfWithQuery = reader.option("dbtable", "(SELECT a, b, c FROM t1) AS tbl1")
.option("partitionColumn", "a")
.load
你首先得到你的结果df然后可以收集查询列,循环并将收集的查询添加到reader以创建一个新的DataFrame。
如果你坚持使用你的方法,那么你已经有了ResultSet2, 。您只需在遍历行时获取您的列,然后将查询与您的 Connection 一起使用。
// ^ Your code above, Connection already exists
while (resultSet.next()){
try {
val queryValue = resultSet.getString("query")
val queryResultSet = statement.executeQuery(queryValue)
while (queryResultSet){
// Do stuff with your newly queried ResultSet
}
}
}
您可以看到多级try catch 不是很漂亮。
如果您愿意,您可以从ResultSet 创建一个Dataset,但请注意,这需要您首先将整个RestulSet 加载到驱动程序的内存中,如果数据很大,您可能会用完。
def resultSetToSpark[T](rs: ResultSet, f: ResultSet => T,
spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
val data: Seq[T] = Iterator.continually(rs.next, rs)
.takeWhile(_._1).map{
case (_,rs) => f(rs)
}.toList
spark.createDataset(data)(encoder)
}
注意,您必须提供一个函数,该函数从ResultSet 获取单行并从检索到的值中实例化class,以及您的类的编码器。下面是一个例子。
case class Potato(a: String, b: String, c: String)
def parseResultSet(rs: ResultSet): Potato = Potato(
rs.getString("a"), rs.getString("b"), rs.getString("c")
)
import org.apache.spark.sql.{Encoder, Encoders}
val encoder: Encoder[Potato] = Encoders.product[Potato]
// Use like this
val dfFromRS = resultSetToSpark(resultSet, parseResultSet, spark, encoder)
因此,最好的方法确实是使用 Spark 的 DataFrameReader 并让 Spark 工作人员与数据库建立多个连接,但这仅在相关数据可能导致驱动程序内存不足时才有意义。
您总是可以像这样获取所有查询的列表。
case class Query(statement: String)
implicit val queryEncoder: Encoder[Query] = Encoders.product[Query]
val queryDs = resultSetToSpark(rs)(rs => Message(rs.getString("query")))(spark, queryEncoder)
val queryList = queryDs.collect.toList
val df1 = spark.sql(queryList.head) // get a DataFrame from the first query in the list