flatMap函数
//初始化执行环境 val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //加载数据 val data = env.fromElements(("A" , 1) , ("B" , 1) , ("C" , 1)) //使用trasformation加载这些数据 //TODO map val map_result = data.map(line => line._1+line._2) map_result.print() //TODO flatmap val flatmap_result = data.flatMap(line => line._1+line._2) flatmap_result.print()
练习:如下数据
A;B;C;D;B;D;C
B;D;A;E;D;C
A;B
要求:统计相邻字符串出现的次数
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} import org.apache.flink.streaming.api.scala._ /** * Created by angel; */ object demo { /** A;B;C;D;B;D;C B;D;A;E;D;C A;B 统计相邻字符串出现的次数(A+B , 2) (B+C , 1).... * */ def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val data = env.fromElements("A;B;C;D;B;D;C;B;D;A;E;D;C;A;B") val map_data: DataSet[Array[String]] = data.map(line => line.split(";")) //[A,B,C,D] ---"A,B,C,D" //[A,B,C,D] ---> (x,1) , (y,1) -->groupBy--->sum--total val tupe_data = map_data.flatMap{ line => for(index <- 0 until line.length-1) yield (line(index)+"+"+line(index+1) , 1) } val gropudata = tupe_data.groupBy(0) val result = gropudata.sum(1) result.print() } }