给你,下面的代码甚至读取每批最新更新的维度数据,但请记住新的维度数据(在我的例子中,国家信息必须在一个新文件中)。
package com.capone.streaming.BraodcastJoin
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.capone.streaming.BroadCastStreamJoin.getClass
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
import org.apache.spark.sql.functions.{broadcast, expr}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
object BroadCastStreamJoin2 {
def main(args: Array[String]) = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))
val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))
import org.apache.spark.sql.streaming.Trigger
val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
//.option("maxFilesPerTrigger", 1)
.csv("src/main/resources/broadcasttest/fact")
var countrDf: Option[DataFrame] = None: Option[DataFrame]
def readDim() = {
val dimDf2 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
if (countrDf != None) {
countrDf.get.unpersist()
}
countrDf = Some(
dimDf2
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2"))
countrDf.get.show()
}
factDf1.writeStream
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.show(10)
readDim()
batchDF
.join(
countrDf.get,
expr(
"""
countrycode_2 = countrycode
"""
),
"leftOuter"
)
.show
}
.start()
.awaitTermination()
}
}