Scala:

        .map(x=> JSON.parseObject(x))
        .assignTimestampsAndWatermarks(
          WatermarkStrategy.forBoundedOutOfOrderness[JSONObject](Duration.ofSeconds(3))
            .withTimestampAssigner(new SerializableTimestampAssigner[JSONObject] {
              override def extractTimestamp(element: JSONObject, recordTimestamp: Long): Long = element.getLong("ts")
            })
        )

 

相关文章: