用来将两个dataStream组装成一个ConnectedStreams

而且这个connectedStream的组成结构就是保留原有的dataStream的结构体;这样我们就可以把不同的数据组装成同一个结构
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val src: DataStream[Int] = env.fromElements(1, 3, 5)
val stringMap: DataStream[String] = src.map(line => "x "+line)
val result = stringMap.connect(src).map(new CoMapFunction[String , Int , String] {
  override def map2(value: Int): String = {
    "x "+ (value + 1)
  }

  override def map1(value: String): String = {
    value
  }
})
result.print()
env.execute()

 

相关文章:

  • 2021-12-05
  • 2021-09-09
  • 2021-09-10
  • 2022-12-23
  • 2022-12-23
  • 2021-09-19
  • 2021-09-07
猜你喜欢
  • 2021-05-19
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-07-28
  • 2022-12-23
相关资源
相似解决方案