【问题标题】:How to get multiple line json File Into Single record as a rdd如何将多行 json 文件作为 rdd 转换为单个记录
【发布时间】:2015-08-07 16:23:15
【问题描述】:
rdd=sc.textFile(json or xml)
rdd.collect()

[u'{', u'    "glossary": {', u'        "title": "example glossary",', u'\t\t"GlossDiv": {', u'            "title": "S",', u'\t\t\t"GlossList": {', u'                "GlossEntry": {', u'                    "ID": "SGML",', u'\t\t\t\t\t"SortAs": "SGML",', u'\t\t\t\t\t"GlossTerm": "Standard Generalized Markup Language",', u'\t\t\t\t\t"Acronym": "SGML",', u'\t\t\t\t\t"Abbrev": "ISO 8879:1986",', u'\t\t\t\t\t"GlossDef": {', u'                        "para": "A meta-markup language, used to create markup languages such as DocBook.",', u'\t\t\t\t\t\t"GlossSeeAlso": ["GML", "XML"]', u'                    },', u'\t\t\t\t\t"GlossSee": "markup"', u'                }', u'            }', u'        }', u'    }', u'}', u'']

但我的输出应该是每一个思考在一行中

{"glossary": {"title": "example glossary","GlossDiv": {"title": "S","GlossList":.....}}

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    我建议使用 Spark SQL JSON,然后保存调用 toJson(参见 https://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets

    val input = sqlContext.jsonFile(path)
    val output = input...
    output.toJSON.saveAsTextFile(outputath)
    

    但是,如果您的 json 记录由于多行问题或其他问题而无法被 Spark SQL 解析,我们可以从Learning Spark book 中举一个示例(当然作为合著者略有偏见) 并将其修改为使用wholeTextFiles

    case class Person(name: String, lovesPandas: Boolean)
    // Read the input and throw away the file names
    val input = sc.wholeTextFiles(inputFile).map(_._2)
    
    // Parse it into a specific case class. We use mapPartitions beacuse:
    // (a) ObjectMapper is not serializable so we either create a singleton object encapsulating ObjectMapper
    //     on the driver and have to send data back to the driver to go through the singleton object.
    //     Alternatively we can let each node create its own ObjectMapper but that's expensive in a map
    // (b) To solve for creating an ObjectMapper on each node without being too expensive we create one per
    //     partition with mapPartitions. Solves serialization and object creation performance hit.
    val result = input.mapPartitions(records => {
        // mapper object created on each executor node
        val mapper = new ObjectMapper with ScalaObjectMapper
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        mapper.registerModule(DefaultScalaModule)
        // We use flatMap to handle errors
        // by returning an empty list (None) if we encounter an issue and a
        // list with one element if everything is ok (Some(_)).
        records.flatMap(record => {
          try {
            Some(mapper.readValue(record, classOf[ioRecord]))
          } catch {
            case e: Exception => None
          }
        })
      }, true)
    result.filter(_.lovesPandas).map(mapper.writeValueAsString(_))
      .saveAsTextFile(outputFile)
    }
    

    在 Python 中:

    from pyspark import SparkContext
    import json
    import sys
    
    if __name__ == "__main__":
        if len(sys.argv) != 4:
            print "Error usage: LoadJson [sparkmaster] [inputfile] [outputfile]"
            sys.exit(-1)
        master = sys.argv[1]
        inputFile = sys.argv[2]
        outputFile = sys.argv[3]
        sc = SparkContext(master, "LoadJson")
        input = sc.wholeTextFiles(inputFile).map(_._2)
        data = input.flatMap(lambda x: json.loads(x))
        data.filter(lambda x: 'lovesPandas' in x and x['lovesPandas']).map(
            lambda x: json.dumps(x)).saveAsTextFile(outputFile)
        sc.stop()
        print "Done!"
    

    【讨论】:

    • 如果我有多行文件,我收到一个错误请确保文件的每一行(或 RDD 中的每个字符串)都是有效的 JSON 对象或 JSON 对象数组。跨度>
    • 糟糕,抱歉,我将使用多行文件的解决方案更新答案,抱歉。
    • 当然,我今晚会在接下来的几个小时内尝试这样做。
    • 如果多行 JSON 文件真的很大,我想并行读取它,Spark 可以吗? (如果我理解正确,这种方法必须在单个线程中处理 json。)
    【解决方案2】:

    请改用sc.wholeTextFiles()

    【讨论】:

      【解决方案3】:

      【讨论】:

      • 我收到错误请确保文件的每一行(或 RDD 中的每个字符串)都是有效的 JSON 对象或 JSON 对象数组。
      猜你喜欢
      • 2023-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-11-14
      • 2014-08-09
      • 2022-01-17
      • 1970-01-01
      相关资源
      最近更新 更多