flatMap函数

//初始化执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//加载数据
val data = env.fromElements(("A" , 1) , ("B" , 1) , ("C" , 1))
//使用trasformation加载这些数据
//TODO map
val map_result = data.map(line => line._1+line._2)
map_result.print()
//TODO flatmap
val flatmap_result = data.flatMap(line => line._1+line._2)
flatmap_result.print()

练习:如下数据

A;B;C;D;B;D;C
B;D;A;E;D;C
A;B

要求:统计相邻字符串出现的次数

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
/**
  * Created by angel;
  */
object demo {
/**
A;B;C;D;B;D;C
B;D;A;E;D;C
A;B
统计相邻字符串出现的次数(A+B , 2) (B+C , 1)....
  * */
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val data = env.fromElements("A;B;C;D;B;D;C;B;D;A;E;D;C;A;B")
    val map_data: DataSet[Array[String]] = data.map(line => line.split(";"))
    //[A,B,C,D] ---"A,B,C,D"
    //[A,B,C,D] ---> (x,1) , (y,1) -->groupBy--->sum--total
    val tupe_data = map_data.flatMap{
      line =>
        for(index <- 0 until line.length-1) yield (line(index)+"+"+line(index+1) , 1)
    }
    val gropudata = tupe_data.groupBy(0)
    val result = gropudata.sum(1)
    result.print()
  }
}
View Code

相关文章: