【问题标题】:How to define a join condition in stream-batch streaming join?如何在流批处理流连接中定义连接条件?
【发布时间】:2020-03-28 06:27:33
【问题描述】:

我在 java 1.8 中使用 spark-sql-2.4.1v。和 kafka 版本 spark-sql-kafka-0-10_2.11_2.4.3。

我正在尝试将静态数据帧(即元数据)与另一个流数据帧连接,如下所示:

 Dataset<Row> streamingDs  = //read from kafka topic
 Dataset<Row> staticDf=  //read from oracle meta-data table.


Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code"
                      );

即使我在数据框中有相应的列数据,它也会给出以下错误。

线程 "main" org.apache.spark.sql.AnalysisException 中的异常:无法在连接左侧解析 USING 列 c.code = i.industry_code。左侧列:[id, tranasctionDate, companyName,code];

我尝试如下:

Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code",
                      "inner"
                      );

这给出了以下错误:

Dataset类型中的join(Dataset, String)方法不适用于参数(Dataset, String, String)

【问题讨论】:

    标签: java apache-spark apache-spark-sql spark-structured-streaming


    【解决方案1】:

    tl;dr c.code = i.industry_code 被认为是要连接的列的名称(不是连接表达式)。


    修改代码如下:

    streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
      .where("c.code = i.industry_code")
    

    【讨论】:

      【解决方案2】:

      给你,下面的代码甚至读取每批最新更新的维度数据,但请记住新的维度数据(在我的例子中,国家信息必须在一个新文件中)。

      package com.capone.streaming.BraodcastJoin
      
      import com.amazonaws.services.dynamodbv2.model.AttributeValue
      import com.capone.streaming.BroadCastStreamJoin.getClass
      import org.apache.log4j.{Level, Logger}
      import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
      import org.apache.spark.sql.functions.{broadcast, expr}
      import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
      
      object BroadCastStreamJoin2 {
      
        def main(args: Array[String]) = {
      
          @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
      
          Logger.getLogger("akka").setLevel(Level.WARN)
          Logger.getLogger("org").setLevel(Level.ERROR)
          Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
          Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
          Logger.getLogger("io.netty").setLevel(Level.ERROR)
      
          val spark = SparkSession
            .builder()
            .master("local")
            .getOrCreate()
      
          val schemaUntyped1 = StructType(
            Array(
              StructField("id", StringType),
              StructField("customrid", StringType),
              StructField("customername", StringType),
              StructField("countrycode", StringType),
              StructField("timestamp_column_fin_1", TimestampType)
            ))
      
          val schemaUntyped2 = StructType(
            Array(
              StructField("id", StringType),
              StructField("countrycode", StringType),
              StructField("countryname", StringType),
              StructField("timestamp_column_fin_2", TimestampType)
            ))
      
          import org.apache.spark.sql.streaming.Trigger
          val factDf1 = spark.readStream
            .schema(schemaUntyped1)
            .option("header", "true")
            //.option("maxFilesPerTrigger", 1)
            .csv("src/main/resources/broadcasttest/fact")
      
          var countrDf: Option[DataFrame] = None: Option[DataFrame]
      
          def readDim() = {
            val dimDf2 = spark.read
              .schema(schemaUntyped2)
              .option("header", "true")
              .csv("src/main/resources/broadcasttest/dimension")
      
            if (countrDf != None) {
              countrDf.get.unpersist()
            }
      
            countrDf = Some(
              dimDf2
                .withColumnRenamed("id", "id_2")
                .withColumnRenamed("countrycode", "countrycode_2"))
      
            countrDf.get.show()
          }
      
          factDf1.writeStream
            .outputMode("append")
            .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
              batchDF.show(10)
              readDim()
      
              batchDF
                .join(
                  countrDf.get,
                  expr(
                    """
            countrycode_2 = countrycode 
            """
                  ),
                  "leftOuter"
                )
                .show
      
            }
            .start()
            .awaitTermination()
      
        }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-05
        • 2018-09-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多