mysql source
1 import java.sql.{Connection, DriverManager, PreparedStatement} 2 3 import org.apache.flink.configuration.Configuration 4 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} 5 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 6 7 object FlinkDemo06_CustomSource_mysql { 8 def main(args: Array[String]): Unit = { 9 //1 环境 10 val env = StreamExecutionEnvironment.getExecutionEnvironment 11 //2 流对象 12 import org.apache.flink.api.scala._ 13 val dStream: DataStream[Flight] = env.addSource(new MySqlSource) 14 //3 计算 统计次数 15 dStream.map(k=>(1,1)).keyBy(0).sum(1).print() 16 //4 执行 17 env.execute("custom mysql source") 18 } 19 20 class MySqlSource extends RichSourceFunction[Flight] { 21 private var connection: Connection = null 22 private var ps: PreparedStatement = null 23 24 override def open(parameters: Configuration): Unit = { 25 val driver = "com.mysql.jdbc.Driver" 26 val url = "jdbc:mysql://localhost:3306/test_db" 27 val username = "root" 28 Class.forName(driver) 29 connection = DriverManager.getConnection(url, username, "123456") 30 val sql = "select id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID from flight" 31 ps = connection.prepareStatement(sql) 32 } 33 34 override def close(): Unit = { 35 if (connection != null) { 36 connection.close() 37 } 38 if (ps != null) { 39 ps.close() 40 } 41 } 42 43 override def run(ctx: SourceFunction.SourceContext[Flight]): Unit = { 44 //获取结果集,遍历并输出,关闭结果集 45 val rs = ps.executeQuery() 46 while(rs.next()) { 47 val flight = Flight(rs.getFloat("avgTicketPrice"),rs.getString("cancelled"),rs.getString("carrier"),rs.getString("dest"),rs.getString("destAirportID"),rs.getString("origin"),rs.getString("originAirportID")) 48 ctx.collect(flight) 49 } 50 rs.close() 51 } 52 53 override def cancel(): Unit = {} 54 } 55 56 case class Flight(avgTicketPrice: Float, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String) 57 }