由于文件太大只能打开部分内容,我们只需要知道一行数据格式即可
下面采用读取的方式为行读取,避免OOM
@Test
public void testReadCsv() throws Exception{
String path="D:\\workspace\\数据平台导入\\20200929\\temperture.csv";
List<TempertureData> lists = new LinkedList<>();
try (
FileInputStream inputStream = new FileInputStream(path);
InputStreamReader isr= new InputStreamReader(inputStream,"GBK");
Scanner sc = new Scanner(isr)) {
/**
* 读取csv文件
* 因为文件过大,所以需要用scanner每行读取
* 用FileInputStream 相当于在内存和文件之见加了一个数据传输管道
* 然后用Scanner去读取这个流,一行一行的去读这个文件
*/
while (sc.hasNextLine()) {
String line = sc.nextLine();
//文件是CSV文件,CSV文件中的每一列是用","隔开的,这样就可以得到每一列的元素
//由于该文件读取的数据携带上引号,这里吧双引号去掉
String s = line.replaceAll("\"", "");
String[] str = s.split(",");
//解析数据
TempertureData tempertureData = new TempertureData();
tempertureData.setTempertureId(Long.valueOf(str[0]));
tempertureData.setYear(str[1]);
tempertureData.setMonth(str[2]);
tempertureData.setDay(str[3]);
tempertureData.setTime(str[4]);
tempertureData.setMonthDay(dataAnalys(str[2],str[3]));
tempertureData.setAveTem(str[5]);
tempertureData.setDayMaxTem(str[6]);
tempertureData.setDayMinTem(str[7]);
tempertureData.setQualityAveTem(str[8]);
tempertureData.setQualityDayMaxTem(str[9]);
tempertureData.setQuailtyDayMinTem(str[10]);
tempertureData.setStationId(str[11]);
lists.add(tempertureData);
if (lists.size()%40000==0){
//每解析4W条数据就执行批量插入
AnalysisList(lists);
lists.clear();
}
}
log.info("剩余数据也insert"+lists.size());
//将剩余数据也插入,防止数据丢失
AnalysisList(lists);
} catch (Exception e) {
e.printStackTrace();
}
}
//使用jdbc批量执行大数据量
public static void AnalysisList( List<TempertureData> lists){
log.info("执行插入操作开始:===============list长度为"+lists.size());
//1.注册驱动
try {
Class.forName("com.mysql.cj.jdbc.Driver");
//2.获取连接
Connection con = DriverManager.getConnection(
"jdbc:mysql://***:3306/dataplatform?useUnicode=true&characterEncoding=utf-8&useSSL=no&rewriteBatchedStatements=true",
"root", "***");
String sql="insert into temperture_data (temperture_id,year,month,day,time,month_day,ave_tem,day_max_tem,day_min_tem,quality_ave_tem,quality_day_max_tem,quailty_day_min_tem,station_id) " +
"values(?,?,?,?,?,?,?,?,?,?,?,?,?)";
//3.获得预处理对象
PreparedStatement pstmt = con.prepareStatement(sql);
for (TempertureData tempertureData :lists){
pstmt.setLong(1, tempertureData.getTempertureId());
pstmt.setString(2, tempertureData.getYear());
pstmt.setString(3, tempertureData.getMonth());
pstmt.setString(4, tempertureData.getDay());
pstmt.setString(5, tempertureData.getTime());
pstmt.setString(6, tempertureData.getMonthDay());
pstmt.setString(7, tempertureData.getAveTem());
pstmt.setString(8, tempertureData.getDayMaxTem());
pstmt.setString(9, tempertureData.getDayMinTem());
pstmt.setString(10, tempertureData.getQualityAveTem());
pstmt.setString(11, tempertureData.getQualityDayMaxTem());
pstmt.setString(12, tempertureData.getQuailtyDayMinTem());
pstmt.setString(13, tempertureData.getStationId());
pstmt.addBatch();
}
int [] counts = pstmt.executeBatch();
log.info("执行条数:"+counts.length);
//6.释放资源
pstmt.close();
con.close();
} catch (Exception e) {
e.printStackTrace();
}
}
解析完毕一共11568000条数据