【发布时间】:2016-04-15 07:22:28
【问题描述】:
我的 RDD 是 \n 分隔的记录,看起来像
Single RDD
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
并希望将其转换为 Array[Map[k,v]],
其中 Array 中的每个元素将是对应于记录的不同 map[k,v]。
数组将包含 N 个这样的映射,具体取决于单个 RDD 中的记录。
我对 scala 和 spark 都是新手。转换中的任何帮助都会有所帮助。
object SparkApp extends Logging with App {
override def main(args: Array[ String ]): Unit = {
val myConfigFile = new File("../sparkconsumer/conf/spark.conf")
val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT)
val propConf = ConfigFactory.load(fileConfig)
val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet
val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS))
//logger.info(message = "Hello World , You are entering Spark!!!")
val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME))
conf.set("HADOOP_HOME", "/usr/local/hadoop")
conf.set("hadoop.home.dir", "/usr/local/hadoop")
//Lookup
// logger.info("Window of 5 Seconds Enabled")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("/tmp/checkpoint")
val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE))
val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect())
val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE))
val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect())
val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet)
writeTOHDFS2(messages)
ssc.start()
ssc.awaitTermination()
}
def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = {
val records = messages.window(Seconds(10), Seconds(10))
val k = records.transform( rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x))
k.foreachRDD { singleRdd =>
if (singleRdd.count() > 0) {
val maps = singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach( x => new mutable.HashMap().put(x(0),x(1)))
val r = scala.util.Random
val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt)
}
}
}
}
【问题讨论】:
-
你的rdd里面的格式是什么?是(k1,v1),(k2,v2)等吗?
-
欢迎来到 StackOverflow :) 您能否将您尝试过的内容包括在内 - 这将使我们更容易理解您正在努力解决的概念。
-
这看起来很简单。你试过什么?你的 RDD 中有多少行 - 你真的想要
RDD[Map[...]]吗?
标签: scala apache-spark