代码如下:
val conf = new SparkConf().setAppName("testMysqlToHiveJdbc") .setMaster("local") val spark = SparkSession.builder() .config(conf) .enableHiveSupport() .getOrCreate() ////定义Propertites,确定链接MySQL的参数 val mysqlProperties = new java.util.Properties() //MySQL的jdbc链接 val mysqlConnectionUrl = "jdbc:mysql://localhost:3306/rest" //定义检索语句,用于MySQL链接 val mysqlTableName = """(select t.*, case when id<4000000 and id >=0 then 1 when id<8000000 and id >=4000000 then 2 when id<12000000 and id >=8000000 then 3 when id<16000000 and id >=12000000 then 4 when id<20000000 and id >=16000000 then 5 else 6 end par from usppa_twitter_data t) tt""" // val mysqlTableName = "usppa_twitter_data" mysqlProperties.put("driver","com.mysql.jdbc.Driver") //确定driver mysqlProperties.put("user","root") //用户名 mysqlProperties.put("password","1234") //密码 mysqlProperties.put("fetchsize","10000") //批次取数数量 mysqlProperties.put("lowerBound","1") //确定分区 mysqlProperties.put("upperBound","7") //确定分区 mysqlProperties.put("numPartitions","6") //分区数量 mysqlProperties.put("partitionColumn","par") //分区字段 //读取数据 val re = spark.read.jdbc(mysqlConnectionUrl, mysqlTableName,mysqlProperties) //写入Hive表中 re.toDF().write.mode("overwrite").saveAsTable("testwarehouse.testtt")