【发布时间】:2019-09-20 23:48:01
【问题描述】:
我有一个 .csv 文件,我正在尝试使用 spark 进行分析。 .csv 文件包含主题列表及其计数等。主题及其计数用“,”分隔,所有这些主题+计数都在同一个字符串中,用“;”分隔像这样
"topic_1,10;topic_2,12;topic_1,3"
如您所见,某些主题多次出现在字符串中。
我有一个 rdd,其中包含一些日期的键值对和主题字符串 [日期,主题字符串]
我想做的是在';'处拆分字符串获取所有单独的主题,然后在','处拆分这些主题,并创建主题名称和计数的键值对,可以通过键减少。对于上面的示例,这将是
[date, ((topic_1, 13), (topic_2, 12))]
所以我一直在玩 spark 很多,因为我是 scala 的新手。我试图做的是
val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})
问题是这会返回一个元组数组,我不能 reduceByKey。当我在';'处拆分字符串时这将返回一个数组。我尝试将它映射到一个元组(从映射操作中可以看到),但这不起作用。
我使用的完整代码是
val rdd = sc.textFile("./data/segment/*.csv")
val topicsByDate = rdd
.filter(line => line.split("\t").length > 23)
.map({case(str) => (str.split("\t")(1), str.split("\t")(23))})
.reduceByKey(_ + _)
val separateTopicsByDate = topicsByDate
.mapValues(_.split(";").map({case(str) => (str)}))
.mapValues({case(arr) => arr
.filter(str => str.split(",").length > 1)
.map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
})
separateTopicsByDate.take(2)
这会返回
res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...
如您所见,这是一个元组数组,我无法在其上使用 .reduceByKey(_ + _)。
有没有办法将字符串拆分为可以通过键减少的方式?
【问题讨论】:
标签: scala apache-spark