【发布时间】:2017-12-04 00:58:52
【问题描述】:
我正在使用 Spark 2.1.1 和 Scala 2.11.8。
我必须从 csv 文件中读取数据,其列的范围从最小 6 到最大 8。我必须拆分 9 个条目,一旦拆分,第 0 到 5 列将始终包含数据。但是,第 6 到第 8 列中可能存在或不存在数据。我使用以下方法将所需的列分隔并存储在 RDD 中:
val read_file = sc.textFile("Path to input file");
val uid = read_file.map(line => {var arr = line.split(","); (arr(2).split(":")(0),arr(3),arr(4).split(":")(0),arr(5).split(":")(0),arr(6).split(":")(0),arr(7).split(":")(0),arr(8).split(":")(0))})
现在,在获得的 RDD 'uid' 中,第 0 到第 3 列将始终被填充,但第 4 到第 7 列可能有数据,也可能没有数据。例如:我从中读取数据的 csv 文件,
2017-05-09 21:52:42 , 1494391962 , p69465323_serv80i:10:450 , 7 , fb_406423006398063:396560, guest_861067032060185_android:671051, fb_100000829486587:186589, fb_100007900293502:407374, fb_172395756592775:649795
2017-05-09 21:52:42 , 1494391962 , z67265107_serv77i:4:45 , 2:Re , fb_106996523208498:110066, fb_274049626104849:86632, fb_111857069377742:69348, fb_127277511127344:46246
2017-05-09 21:52:42 , 1494391962 , v73392772_serv33i:9:1400 , 1:4x , c2eb11fd-99dc-4dee-a75c-bc9bfd2e0ae4iphone:314129, fb_217409795286934:294262
可以看出,第一个条目已填充所有 9 列,第二个条目已填充 8 个,第三个条目仅填充了 6 列。
从获得的 RDD 中,我必须将列 arr(1)(0) 与列 arr(3)(0) 映射到 arr(7)(0)。列 1 的映射应该只对填充列进行从 3 到 7。3 到 7 之间的空列不必与第 1 列映射。我试图使用 for 循环来做到这一点:
一旦我在执行语句 val uid = read_file.map() 后得到了这个:
(String, String, String, String, String, String, String) = (" p69465323_serv80i"," 7 "," fb_406423006398063"," guest_861067032060185_android"," fb_100000829486587"," fb_100007900293502"," fb_172395756592775")
我愿意:
for (var x <= 5 to 7) { if var arr => (arr(x) != null) {
val pairedRdd = uid.map(x => ((x._1, x._3), (x._1, x._4), (x._1, x._5), (x._1, x._6), (x._1, x._7)) ) }
这将适用于给定数据示例中的第一条语句,但不适用于第二条和第三条。
我承认逻辑是错误的,但这只是为了传达我正在尝试做的事情的想法。
P.S : 不允许使用 Spark SQL。
【问题讨论】:
-
缺少的列,它们是否按顺序排列?我的意思是说,当缺少一列时,列号是第 7 列,对吗?
-
正确。缺少的列将始终按顺序排列。
-
您有 9 列。您能否查看您的问题以更新不正确的信息?
-
完成。我已经提到,第二行本身的列从 0 开始。列从 0 到 8。
-
您所需输出的样本也应该有所帮助:)
标签: arrays scala csv apache-spark mapping