mysql source

 1 import java.sql.{Connection, DriverManager, PreparedStatement}
 2 
 3 import org.apache.flink.configuration.Configuration
 4 import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
 5 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 6 
 7 object FlinkDemo06_CustomSource_mysql {
 8     def main(args: Array[String]): Unit = {
 9         //1 环境
10         val env = StreamExecutionEnvironment.getExecutionEnvironment
11         //2 流对象
12         import org.apache.flink.api.scala._
13         val dStream: DataStream[Flight] = env.addSource(new MySqlSource)
14         //3 计算 统计次数
15         dStream.map(k=>(1,1)).keyBy(0).sum(1).print()
16         //4 执行
17         env.execute("custom mysql source")
18     }
19     
20     class MySqlSource extends RichSourceFunction[Flight] {
21         private var connection: Connection = null
22         private var ps: PreparedStatement = null
23         
24         override def open(parameters: Configuration): Unit = {
25             val driver = "com.mysql.jdbc.Driver"
26             val url = "jdbc:mysql://localhost:3306/test_db"
27             val username = "root"
28             Class.forName(driver)
29             connection = DriverManager.getConnection(url, username, "123456")
30             val sql = "select id,avgTicketPrice,cancelled,carrier,dest,destAirportID,origin,originAirportID from flight"
31             ps = connection.prepareStatement(sql)
32         }
33         
34         override def close(): Unit = {
35             if (connection != null) {
36                 connection.close()
37             }
38             if (ps != null) {
39                 ps.close()
40             }
41         }
42         
43         override def run(ctx: SourceFunction.SourceContext[Flight]): Unit = {
44             //获取结果集,遍历并输出,关闭结果集
45             val rs = ps.executeQuery()
46             while(rs.next()) {
47                 val flight = Flight(rs.getFloat("avgTicketPrice"),rs.getString("cancelled"),rs.getString("carrier"),rs.getString("dest"),rs.getString("destAirportID"),rs.getString("origin"),rs.getString("originAirportID"))
48                 ctx.collect(flight)
49             }
50             rs.close()
51         }
52         
53         override def cancel(): Unit = {}
54     }
55     
56     case class Flight(avgTicketPrice: Float, cancelled: String, carrier: String, dest: String, destAirportID: String, origin: String, originAirportID: String)    
57 }
View Code

相关文章:

  • 2021-08-16
  • 2021-12-14
  • 2021-11-06
  • 2022-12-23
  • 2021-07-05
  • 2022-12-23
猜你喜欢
  • 2022-01-20
  • 2022-12-23
  • 2021-11-06
  • 2021-12-16
  • 2021-06-28
  • 2021-08-10
  • 2021-05-30
相关资源
相似解决方案