【问题标题】:Spark streaming join Kafka topics comparisonSpark 流式加入 Kafka 主题对比
【发布时间】:2019-06-01 21:58:29
【问题描述】:

我们需要在 Kafka 主题上实现连接,并考虑延迟数据或“未加入”,这意味着流中延迟或未加入的数据不会被丢弃/丢失,但会被标记为超时,

生成连接结果以输出 Kafka 主题(如果发生超时,则提交)。

(独立部署中的 spark 2.1.1,Kafka 10)

Kafka in topic: X, Y,... out topics 结果将如下所示:

{
    "keyJoinFiled": 123456,
    "xTopicData": {},
    "yTopicData": {},
    "isTimeOutFlag": true
}

我发现三个解决方案在这里写了它们,1 和 2 来自 spark 流官方文档,但与我们无关(数据未加入 Dtsream,到达“业务时间”较晚,被丢弃/丢失)但我写它们是为了比较.

从我们看到的情况来看,使用有状态操作的 Kafka 加入主题的示例并不多,请在此处添加一些代码以供查看:

1) 根据火花流文档,

https://spark.apache.org/docs/2.1.1/streaming-programming-guide.html:   
 val stream1: DStream[String, String] = 
 val stream2: DStream[String, String] = 
 val joinedStream = stream1.join(stream2)

这将加入来自两个流批次持续时间的数据,但数据到达“业务时间”较晚/未加入将被丢弃/丢失。

2) 窗口连接:

val leftWindowDF = kafkaStreamLeft.window(Minutes(input_parameter_time))
val rightWindowDF = kafkaStreamRight.window(Minutes(input_parameter_time))
leftWindowDF.join(rightWindowDF).foreachRDD...

2.1) 在我们的例子中,我们需要使用 Tumbling 窗口来考虑 火花流批处理间隔。 2.2)需要在内存/磁盘中保存大量数据,例如30-60分钟 窗户 2.3) 数据再次延迟到达/不在窗口中/不在连接中 掉落/丢失。 * 由于 spark 2.3.1 Structured streaming stream to stream join 是 支持,但我们遇到了不清理 HDFS 状态的错误 商店,结果,每隔几个小时就有一份工作在 OOM 上下降, 在 2.4 中解决 ,https://issues.apache.org/jira/browse/SPARK-23682 (使用 Rocksdb 或 CustomStateStoreProvider HDFS 状态存储)。

3) 使用有状态操作 mapWithState 加入 Kafka 主题 Dstreams 具有翻滚窗口和延迟数据 30 分钟超时, 为输出主题生成的所有数据都包含来自所有 如果发生连接,则为主题;如果没有,则为主题数据的一部分 加入发生在 30 分钟内(用 is_time_out 标志标记)

3.1) 每个主题创建 1..n 个 Dstream,转换为 Key value/Unioned 将连接归档为键和翻转窗口的记录。 创建一个包罗万象的计划。 3.2)联合所有流 3.3) 使用函数在联合流 mapWithState 上运行 - 实际上会执行 加入/标记超时。

从数据块(spark 2.2.0)进行状态连接的好例子: https://www.youtube.com/watch?time_continue=1858&v=JAb4FIheP28

添加正在运行/测试的示例代码。

 val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "session.timeout.ms" -> "30000"
  )

  //Kafka xTopic DStream
  val kafkaStreamLeft = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](leftTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:xTopic = gson.fromJson(record.value(),classOf[xTopic])
    Unioned(Some(msg),None,if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(leftWindow),Minutes(leftWindow))

  //Kafka yTopic DStream
  val kafkaStreamRight = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](rightTopic.split(",").toSet, kafkaParams)
  ).map(record => {
    val msg:yTopic = gson.fromJson(record.value(),classOf[yTopic])
    Unioned(None,Some(msg),if (msg.sessionId!= null) msg.sessionId.toString else "")
  }).window(Minutes(rightWindow),Minutes(rightWindow))

  //convert stream to key, value pair and filter empty session id.
  val unionStream = kafkaStreamLeft.union(kafkaStreamRight).map(record =>(record.sessionId,record))
    .filter(record => !record._1.toString.isEmpty)
  val stateSpec = StateSpec.function(stateUpdateF).timeout(Minutes(timeout.toInt))

  unionStream.mapWithState(stateSpec).foreachRDD(rdd => {
    try{
      if(!rdd.isEmpty()) rdd.foreachPartition(partition =>{
        val props = new util.HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        //send to kafka result JSON.
        partition.foreach(record => {
          if(record!=null && !"".equals(record) && !"()".equals(record.toString) && !"None".equals(record.toString) ){
            producer.send(new ProducerRecord[String, String](outTopic, null, gson.toJson(record)))
          }
        })
        producer.close()
      })
    }catch {
      case e: Exception  => {
        logger.error(s""""error join topics :${leftTopic} ${rightTopic} to out topic ${outTopic}""")
        logger.info(e.printStackTrace())
      }
    }})

//mapWithState function that will be called on each key occurrence with new items in newItemValues and state items if exits.

def stateUpdateF = (keySessionId:String,newItemValues:Option[Unioned],state:State[Unioned])=> {
    val currentState = state.getOption().getOrElse(Unioned(None,None,keySessionId))

    val newVal:Unioned = newItemValues match {
      case Some(newItemValue) => {
        if (newItemValue.yTopic.isDefined)
          Unioned(if(newItemValue.xTopic.isDefined) newItemValue.xTopic else currentState.xTopic,newItemValue.yTopic,keySessionId)
        else if (newItemValue.xTopic.isDefined)
          Unioned(newItemValue.xTopic, if(currentState.yTopic.isDefined)currentState.yTopic else newItemValue.yTopic,keySessionId)
        else newItemValue
      }
      case _ => currentState //if None = timeout => currentState
    }

    val processTs = LocalDateTime.now()
    val processDate = dtf.format(processTs)
    if(newVal.xTopic.isDefined && newVal.yTopic.isDefined){//if we have a join remove from state
      state.remove()
      JoinState(newVal.sessionId,newVal.xTopic,newVal.yTopic,false,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else if(state.isTimingOut()){//time out do no try to remove state manually ,it's removed automatically.
        JoinState(newVal.sessionId, newVal.xTopic, newVal.yTopic,true,processTs.toInstant(ZoneOffset.UTC).toEpochMilli,processDate)
    }else{
      state.update(newVal)
    }
  }

  //case class for kafka topics data.(x,y topics ) join will be on session id filed.
  case class xTopic(sessionId:String,param1:String,param2:String,sessionCreationDate:String)
  case class yTopic(sessionId:Long,clientTimestamp:String)
  //catch all schema : object that contains both kafka input fileds topics and key valiue for join.
  case class Unioned(xTopic:Option[xTopic],yTopic:Option[yTopic],sessionId:String)
  //class for  output result of join stateful function.
  case class JoinState(sessionId:String, xTopic:Option[xTopic],yTopic:Option[yTopic],isTimeOut:Boolean,processTs:Long,processDate:String)

我很乐意接受一些评论。 对不起,很长的帖子。

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    我的印象是这个用例是由 Sessionization API 解决的?:

    StructuredSessionization.scala

    还有Stateful Operations in Structured Streaming

    还是我错过了什么?

    【讨论】:

    • 我们正在使用 Spark Streaming 2.1.1,并将在 2 个月内迁移到 Spark 2.4。
    • 我建议您熟悉 Spark 2.2 及更高版本中的 mapGroupsWithState(...) 和 flatMapGroupsWithState(...)。该 API 可能会为您提供想法并帮助您编写一些解决方法来解决此问题,直到升级为止。
    猜你喜欢
    • 2019-10-11
    • 1970-01-01
    • 2016-10-15
    • 2019-07-12
    • 1970-01-01
    • 2018-06-05
    • 2020-05-01
    • 2019-05-13
    • 1970-01-01
    相关资源
    最近更新 更多