【问题标题】:Aggregation Function and Process Function on event time事件时间的聚合函数和处理函数
【发布时间】:2019-08-20 20:11:13
【问题描述】:

在此处输入代码需要使用flink聚合kafka流中的数据值并输出一个新主题。

聚合应该发生在 eventtime 上,而不是处理时间上,这意味着数据对象中的时间戳。

按照 Flink 教程中的示例,使用 TumblingEventTimeWindow ,但根本没有调用聚合 getResult 方法。

如果我更改为 TumblingProcessingTimeWIdow ,则会调用 getResult 并将结果推送到接收器。

由于它是一个传感器事件,我们需要考虑事件时间和聚合应该发生在事件时间,而不是处理时间。

import java.util.Properties

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.apache.flink.util.Collector
import org.apache.log4j.Logger

import scala.collection.JavaConverters._
object DownsamplingService  {


  val log=Logger.getLogger("Service")
  def main(args: Array[String]): Unit = {


    val parameter: Properties = ParameterTool.fromArgs(args).getProperties

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
    // create a checkpoint every 60 seconds

    env.enableCheckpointing(60000)
    env.setParallelism(4) //Dependsnon core
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


        //var params = new Properties
parameter.put("bootstrap.servers", "kafka-server:9092")
    parameter.put("zookeeper.connect", "zookeeper:2181")

    parameter.put("group.id", "downsampling-consumer")

    env.getConfig.setGlobalJobParameters(ParameterTool.fromArgs(args))

    val kafkaDiProducer = FlinkKafkaProducer011("engine-test2", new JsonNodeDeserializationSchema, params)




    var norTopics = List.tabulate(normalDiTopicCount)(n => s"$normalDiTopicPrefix$n")
    val serTopics =(norTopics :::  List.tabulate(serviceDITopicCount)(n => s"$serviceDITopicCount$n")).asJava



    val kafkaConsumer = new FlinkKafkaConsumer011(serTopics, new JsonNodeDeserializationSchema(), parameter)






//    val ipstream= env.addSource(kafkaConsumer).name("source")
//      .assignTimestampsAndWatermarks(new TimestampExtractor).keyBy(_.get("uuid").asText()).window(Tum.of(Time.minutes(1))).process(new MyProcessWindowFunction).name("aggregate")

    val ipstream= env.addSource(kafkaConsumer).name("source").assignTimestampsAndWatermarks(new TimestampExtractor)
      .keyBy(_.get("uuid").asText()).window(TumblingEventTimeWindows.of(Time.seconds(60))).trigger(ContinuousEventTimeTrigger.of(Time.seconds(60))).aggregate(new AverageAggregate,new MyProcessWindowFunction).name("aggregate")
    //.keyBy(_.get("uuid").asText()).window(TumblingProcessingTimeWindows.of(Time.minutes(1))).aggregate(new AverageAggregate, new MyProcessWindowFunction).name("aggregate")


    ipstream.print()

    ipstream.addSink(kafkaDiProducer).name("kafka-push")



    env.execute("aggregate-stream")






  }




  class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode](Time.minutes(1)) {
    override def extractTimestamp(element: ObjectNode): Long = element.get(Constants.Timestamp).asLong()
  }

  case class Acc[T](key:String,timeStamp:Long, min:T,max:T,count:Long,result:Double)
  //case class Result[T](min:T,max:T,count:Long,value:Double)

  // Function definitions

  /**
    * The accumulator is used to keep a running sum and a count. The [getResult] method
    * computes the average.
    */
  class AverageAggregate extends AggregateFunction[ObjectNode, Acc[Double], ObjectNode] {
    override def createAccumulator() = Acc("",0L,0,0,0L,0)


    override def add(value: ObjectNode, accumulator: Acc[Double]): Acc[Double] = {
      val valueCol=value.get("value").asDouble();
      val maxVal=if(accumulator.timeStamp<=0 || valueCol>accumulator.max){
        valueCol
      }else{
        accumulator.max
      }
      val minVal= if(accumulator.timeStamp<=0 || valueCol<accumulator.min){
        valueCol
      }else{
        accumulator.min
      }
      val timeStamp=value.get("timestamp").asLong()

      val newTimeSamp=if(accumulator.timeStamp<=0 || accumulator.timeStamp>timeStamp ){
        timeStamp
      }else{
        accumulator.timeStamp
      }

      log.info(s"${value.get("uuid").asText()} $timeStamp $newTimeSamp $maxVal $minVal"  )
      Acc(value.get("uuid").asText(),newTimeSamp,minVal,maxVal,accumulator.count+1l,valueCol+accumulator.result)
    }

    override def getResult(accumulator: Acc[Double]):ObjectNode = {

      log.info(s"${accumulator.key} ${accumulator.timeStamp} ${accumulator.result} ${accumulator.max} ${accumulator.min}"  )

      val result=JsonNodeFactory.instance.objectNode()
      result.put("uuid",accumulator.key)
      result.put("timestamp",accumulator.timeStamp)
      result.put("value_max_1_m",accumulator.max)
      result.put("value_min_1_m",accumulator.min)
      result.put("value_sum_1_m",accumulator.result)
      result.put("value_count_1_m",accumulator.count)
      result.put("value_mean_1_m",accumulator.result/accumulator.count)

      result
    } //Acc(accumulator.min,accumulator.max,accumulator.count, accumulator.sum / accumulator.count)

    override def merge(a: Acc[Double], b: Acc[Double]): Acc[Double] = {
      val maxVal=if(b.max>a.max){
        b.max
      }else{
        a.max
      }
      val minVal=if(b.min<a.min){
        b.min
      }else{
        a.min
      }


      val newTimeSamp=if(a.timeStamp<=0 || a.timeStamp>b.timeStamp ){
        b.timeStamp
      }else{
        a.timeStamp
      }



      Acc(a.key,newTimeSamp,maxVal,minVal,a.count+b.count,a.result+b.result)
    }

  }

  class MyProcessWindowFunction extends ProcessWindowFunction[ObjectNode, ObjectNode, String, TimeWindow] {

    override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[ObjectNode]): Unit = {
      val averages=elements.iterator.next()
      out.collect(averages)
    }
  }

}

聚合应该发生在 eventtime 上,而不是处理时间上,这意味着数据对象中的时间戳。

【问题讨论】:

  • 请正确格式化您的代码。
  • 以上代码仅在将 ENV Parallelism 设置为 1 时有效。然后仅生成和高级 watermater。

标签: apache-flink flink-streaming


【解决方案1】:

将 ENV Parallelism 设置为 1 时,上面的代码正在工作。然后只生成和高级的水标记。如果您的主题有多个分区,则 flink 的 kafka 消费者连接器似乎存在问题。

在 [https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission]

中解释

【讨论】:

    猜你喜欢
    • 2020-12-11
    • 1970-01-01
    • 2016-09-11
    • 2010-11-21
    • 2013-05-25
    • 2018-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多