【问题标题】:Convert each record in RDD to a Array[Map] using scala and Spark使用 scala 和 Spark 将 RDD 中的每条记录转换为 Array[Map]
【发布时间】:2016-04-15 07:22:28
【问题描述】:

我的 RDD 是 \n 分隔的记录,看起来像

Single RDD

k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3
k1=v1,k2=v2,k3=v3

并希望将其转换为 Array[Map[k,v]],

其中 Array 中的每个元素将是对应于记录的不同 map[k,v]。

数组将包含 N 个这样的映射,具体取决于单个 RDD 中的记录。

我对 scala 和 spark 都是新手。转换中的任何帮助都会有所帮助。

object SparkApp  extends Logging with App {


  override def main(args: Array[ String ]): Unit = {
    val myConfigFile = new File("../sparkconsumer/conf/spark.conf")
    val fileConfig = ConfigFactory.parseFile(myConfigFile).getConfig(GlobalConstants.CONFIG_ROOT_ELEMENT)
    val propConf = ConfigFactory.load(fileConfig)
    val topicsSet = propConf.getString(GlobalConstants.KAFKA_WHITE_LIST_TOPIC).split(",").toSet
    val kafkaParams = Map[ String, String ]("metadata.broker.list" -> propConf.getString(GlobalConstants.KAFKA_BROKERS))


    //logger.info(message = "Hello World , You are entering Spark!!!")
    val conf = new SparkConf().setMaster("local[2]").setAppName(propConf.getString(GlobalConstants.JOB_NAME))
    conf.set("HADOOP_HOME", "/usr/local/hadoop")
    conf.set("hadoop.home.dir", "/usr/local/hadoop")
    //Lookup

    // logger.info("Window of 5 Seconds Enabled")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("/tmp/checkpoint")

    val apiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.API_FILE))
    val arrayApi = ssc.sparkContext.broadcast(apiFile.distinct().collect())

    val nonApiFile = ssc.sparkContext.textFile(propConf.getString(GlobalConstants.NON_API_FILE))
    val arrayNonApi = ssc.sparkContext.broadcast(nonApiFile.distinct().collect())


    val messages = KafkaUtils.createDirectStream[ String, String, StringDecoder, StringDecoder ](ssc, kafkaParams, topicsSet)
    writeTOHDFS2(messages)
    ssc.start()
    ssc.awaitTermination()
  }



  def writeTOHDFS2(messages: DStream[ (String, String) ]): Unit = {
    val records = messages.window(Seconds(10), Seconds(10))
    val k = records.transform( rdd => rdd.map(r =>r._2)).filter(x=> filterNullImpressions(x))

    k.foreachRDD { singleRdd =>
      if (singleRdd.count() > 0) {


        val maps =  singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap(x => x.split("=")).foreach( x => new mutable.HashMap().put(x(0),x(1))) 


        val r = scala.util.Random
        val sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm")
        maps.saveAsTextFile("hdfs://localhost:8001/user/hadoop/spark/" + sdf.format(new Date())+r.nextInt)
      }
    }

  }

}

【问题讨论】:

  • 你的rdd里面的格式是什么?是(k1,v1),(k2,v2)等吗?
  • 欢迎来到 StackOverflow :) 您能否将您尝试过的内容包括在内 - 这将使我们更容易理解您正在努力解决的概念。
  • 这看起来很简单。你试过什么?你的 RDD 中有多少行 - 你真的想要 RDD[Map[...]] 吗?

标签: scala apache-spark


【解决方案1】:

这里有一些代码应该是不言自明的。

val lines = "k1=v1,k2=v2,k3=v3\nk1=v1,k2=v2\nk1=v1,k2=v2,k3=v3,k4=v4"

val maps = lines.split("\n")
 .map(line => line.split(",")
 .map(kvPairString => kvPairString.split("="))
 .map(kvPairArray => (kvPairArray(0), kvPairArray(1))))
 .map(_.toMap)

// maps is of type Array[Map[String, String]]

println(maps.mkString("\n"))

//  prints:
//  Map(k1 -> v1, k2 -> v2, k3 -> v3)
//  Map(k1 -> v1, k2 -> v2)
//  Map(k1 -> v1, k2 -> v2, k3 -> v3, k4 -> v4)

忠告 - SO 不是“为我编写代码”平台。我知道仅仅深入研究 Scala 和 Spark 非常困难,但下次请尝试自己解决问题,并发布到目前为止您尝试过的内容以及遇到的问题。

【讨论】:

  • 感谢 slouc,完全为我工作,没有任何改变。
  • 已经尝试了很多变体:singleRdd.map(line => line.split("\n").flatMap(x => x.split(",")).flatMap( x => x.split("=")).foreach( x => new mutable.HashMap().put(x(0),x(1)))
  • @krupavarughese 是的,好吧,写下你尝试过的东西不要感到尴尬 :) 它显示了善意,并将你与那些只是发布他们的问题并等待有人编写解决方案的用户区分开来(那些很快就会被否决)。
猜你喜欢
  • 2023-02-02
  • 1970-01-01
  • 1970-01-01
  • 2018-08-19
  • 2016-04-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多