我更进一步并实现了一个 DSL,具体语法为:COPY INTO <table/stage> FROM (SELECT FROM <table/stage>)...,这是我们的用例。
下面是 DSL 的主要代码,稍加简化。您将看到对未显示 impl 的各种自定义元素的引用,主要是它们扩展 CustomQueryPart。
请注意,它在 Scala 中 - 如果您不熟悉,请提前道歉。当我找到时间我将编辑并转换为 Java:
trait SnowflakeDSLContext extends DSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]) : CopyIntoFromStep2[R, T1, T2]
}
class SnowflakeDSLContextImpl(config: Configuration)
extends DefaultDSLContext(config) with SnowflakeDSLContext {
def copyInto[R <: Record, T1, T2](table: Table[R],
f1: Field[T1],
f2: Field[T2]): CopyIntoFromStep2[R, T1, T2] =
new CopyIntoImpl[R, T1, T2](this, table, Seq(f1, f2))
trait CopyIntoFromStep2[R <: Record, T1, T2] {
def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep
}
trait CopyIntoParametersStep extends CopyIntoFinalStep {
def fileFormat(formatType: FileFormatType,
maybeComprType: Option[CompressionType] = None): CopyIntoParametersStep
def purge(enabled: Boolean): CopyIntoParametersStep
}
trait CopyIntoFinalStep {
def fetchResults(): Seq[CopyIntoResult]
}
// This is the main class which "does the trick"
// by accumulating the query parts and building the final query
case class CopyIntoImpl[R <: Record, T1, T2](
dslContext: DSLContext,
table: Table[R],
fields: Seq[Field[_]],
maybeSelect: Option[Select[_ <: Record2[T1, T2]]] = None,
maybeFileFormat: Option[FileFormat] = None,
copyOptions: Seq[SingleValueParam[_]] = Nil
) extends CopyIntoFromStep2[R, T1, T2] with CopyIntoParametersStep {
override def from(select: Select[_ <: Record2[T1, T2]]): CopyIntoParametersStep = {
copy(maybeSelect = Some(select))
}
override def fileFormat(formatType: FileFormatType,
maybeCompr: Option[CompressionType] = None): CopyIntoParametersStep = {
copy(maybeFileFormat = Some(FileFormat(formatType, maybeCompr)))
}
override def purge(enabled: Boolean): CopyIntoParametersStep =
copy(copyOptions = copyOptions :+ Purge(enabled))
override def fetchResults(): Seq[CopyIntoResult] = {
dslContext.resultQuery("COPY INTO {0} {1} FROM ({2}) {3} {4}",
table,
UnqualifiedFieldList(fields),
maybeSelect.getOrElse(""),
maybeFileFormat.getOrElse(""),
LineDelimitedList(copyOptions))
.fetch(CopyIntoResultMapper)
.asScala
.toSeq
}
}
用法示例:
val results = dslContext.copyInto(targetTable, field1, field2)
.from(DSL.select(field1, field2)
.from(sourceTable))
.fileFormat(JSON, Some(GZIP))
.purge(true)
.fetchResults()