【问题标题】:Read from hdfs and write to MySQL从 hdfs 读取并写入 MySQL
【发布时间】:2021-10-19 07:29:18
【问题描述】:

我是大数据开发的新手。我有一个用例从 hdfs 读取数据,通过 spark 处理并保存到 MySQL db。保存到 MySQL db 的原因是报告工具指向 MySQL。 所以我想出了下面的流程来实现它。任何人都可以验证并建议需要任何优化/更改。

val df = spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema","true")
    .option("nullValue","NA")
    .option("mode","failfast")
    .load("hdfs://localhost:9000/user/testuser/samples.csv")  

val resultsdf = df.select("Sample","p16","Age","Race").filter($"Anatomy".like("BOT"))  

val prop=new java.util.Properties
prop.setProperty("driver", "com.mysql.cj.jdbc.Driver")  
prop.setProperty("user", "root")  
prop.setProperty("password", "pw")  
val url = "jdbc:mysql://localhost:3306/meta" 
 df.write.mode(SaveMode.Append).jdbc(url,"sample_metrics",prop)

【问题讨论】:

    标签: mysql scala apache-spark hdfs


    【解决方案1】:

    此行需要更改val resultdf= ...,您正在使用“解剖”列进行过滤,但您没有选择该列是选择子句。添加该列,否则您将得到错误-Analysis Exception unable to resolve column Anatomy.

    val resultsdf = df.select("Sample","p16","Age","Race", "Anatomy").filter($"Anatomy".like("BOT"))  
    

    优化: 您可以使用 numPartitionsbatchsize 等附加属性。 你可以阅读这些属性here

    【讨论】:

      猜你喜欢
      • 2018-10-24
      • 2018-01-31
      • 1970-01-01
      • 1970-01-01
      • 2018-12-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多