代码如下:

    val conf = new SparkConf().setAppName("testMysqlToHiveJdbc")
                                           .setMaster("local")
    val spark = SparkSession.builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()
    ////定义Propertites,确定链接MySQL的参数
    val mysqlProperties = new java.util.Properties()
    //MySQL的jdbc链接
    val mysqlConnectionUrl = "jdbc:mysql://localhost:3306/rest"
    //定义检索语句,用于MySQL链接
    val mysqlTableName = """(select t.*,
    case when id<4000000 and id >=0 then 1
            when id<8000000 and id >=4000000 then 2
            when id<12000000 and id >=8000000 then 3
            when id<16000000 and id >=12000000 then 4
            when id<20000000 and id >=16000000 then 5
    else 6 end par
         from usppa_twitter_data t) tt"""
    //    val mysqlTableName = "usppa_twitter_data"
    mysqlProperties.put("driver","com.mysql.jdbc.Driver")   //确定driver
    mysqlProperties.put("user","root")          //用户名
    mysqlProperties.put("password","1234")      //密码
    mysqlProperties.put("fetchsize","10000")     //批次取数数量
    mysqlProperties.put("lowerBound","1")        //确定分区
    mysqlProperties.put("upperBound","7")           //确定分区
    mysqlProperties.put("numPartitions","6")        //分区数量
    mysqlProperties.put("partitionColumn","par")    //分区字段

    //读取数据
    val re = spark.read.jdbc(mysqlConnectionUrl, 
                   mysqlTableName,mysqlProperties)
    //写入Hive表中
    re.toDF().write.mode("overwrite").saveAsTable("testwarehouse.testtt")                            
View Code

相关文章: