【问题标题】:spark non json to json and to dataframe error火花非json到json和数据框错误
【发布时间】:2019-02-06 16:27:46
【问题描述】:

我有一个 json 类型文件(不是真正的 json 结构),但我转换为 json 并通过 spark read json 读取(我们在 spark 1.6.0 中),我还不能使用 spark 2 的多行功能。它显示结果,但同时出错。非常感谢任何帮助。

我有这样的文档 .. 只举了一个例子,但它是一个数组:

$result = [
            {
              'name' => 'R-2018:1583',
              'issue_date' => '2018-05-17 02:51:06',
              'type' => 'Product Enhancement Advisory', 
              'last_modified_date' => '2018-05-17 03:51:00',
              'id' => 273,
              'update_date' => '2018-05-17 02:51:06',
              'synopsis' => ' enhancement  update',
              'advory' => 'R:1583'
            }
                ]

我是这样用的:

jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json").map(lambda x: x[1]).map(lambda x:x.replace('$result =','')).map(lambda x: x.replace("'",'"')).map(lambda x:x.replace("\n","")).map(lambda x:x.replace("=>",":")).map(lambda x:x.replace("  ",""))
sqlContext.read.json(rdd).show() 

它显示结果,但我也收到以下错误,请帮助解决这个问题。

18/08/31 11:19:30 WARN util.ExecutionListenerManager:执行查询执行侦听器时出错 java.lang.ArrayIndexOutOfBoundsException: 0 在 org.apache.spark.sql.query.analysis.QueryAnalysis$$anonfun$getInputMetadata$2.apply(QueryAnalysis.scala:121) 在 org.apache.spark.sql.query.analysis.QueryAnalysis$$anonfun$getInputMetadata$2.apply(QueryAnalysis.scala:108) 在 scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) 在 scala.collection.immutable.List.foldLeft(List.scala:84) 在 org.apache.spark.sql.query.analysis.QueryAnalysis$.getInputMetadata(QueryAnalysis.scala:108) 在 com.cloudera.spark.lineage.ClouderaNavigatorListener.writeQueryMetadata(ClouderaNavigatorListener.scala:74) 在 com.cloudera.spark.lineage.ClouderaNavigatorListener.onSuccess(ClouderaNavigatorListener.scala:54) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:100) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1$$anonfun$apply$mcV$sp$1.apply(QueryExecutionListener.scala:99) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:121) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling$1.apply(QueryExecutionListener.scala:119) 在 scala.collection.immutable.List.foreach(List.scala:318) 在 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) 在 scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) 在 org.apache.spark.sql.util.ExecutionListenerManager.org$apache$spark$sql$util$ExecutionListenerManager$$withErrorHandling(QueryExecutionListener.scala:119) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply$mcV$sp(QueryExecutionListener.scala:99) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:99) 在 org.apache.spark.sql.util.ExecutionListenerManager$$anonfun$onSuccess$1.apply(QueryExecutionListener.scala:99) 在 org.apache.spark.sql.util.ExecutionListenerManager.readLock(QueryExecutionListener.scala:132) 在 org.apache.spark.sql.util.ExecutionListenerManager.onSuccess(QueryExecutionListener.scala:98) 在 org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2116) 在 org.apache.spark.sql.DataFrame.head(DataFrame.scala:1389) 在 org.apache.spark.sql.DataFrame.take(DataFrame.scala:1471) 在 org.apache.spark.sql.DataFrame.showString(DataFrame.scala:184) 在 sun.reflect.GeneratedMethodAccessor55.invoke(未知来源) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.lang.reflect.Method.invoke(Method.java:606) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 在 py4j.Gateway.invoke(Gateway.java:259) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:209) 在 java.lang.Thread.run(Thread.java:745)

【问题讨论】:

    标签: json apache-spark pyspark


    【解决方案1】:

    json函数将json文件的路径作为参数,所以你需要先将json保存在某个地方,然后再读取这个文件。

    类似的东西应该可以工作

    jsonRDD = sc.wholeTextFiles("/user/xxxx/aa.json")
                .map(lambda x: x[1])
                .map(lambda x:x.replace('$result =',''))
                .map(lambda x: x.replace("'",'"'))
                .map(lambda x:x.replace("\n",""))
                .map(lambda x:x.replace("=>",":"))
                .map(lambda x:x.replace("  ",""))
                .saveAsTextFile("/user/xxxx/aa_transformed.json") 
    sqlContext.read.json(jsonRDD).show() 
    

    【讨论】:

    • write.json 不起作用..我认为它仅适用于数据帧。下面的错误消息:AttributeError: 'PipelinedRDD' 对象没有属性 'write' 但是我使用了 jsonRDD.coalesce(1).saveAsTextFile("/user/xxxx/aa_transformed.json") 。效果很好。。谢谢
    • 有没有办法在不写入文件并在 Sqlcontext 中使用的情况下解决?
    猜你喜欢
    • 1970-01-01
    • 2016-07-09
    • 2020-06-21
    • 2023-04-03
    • 2019-03-19
    • 1970-01-01
    • 1970-01-01
    • 2016-11-13
    • 1970-01-01
    相关资源
    最近更新 更多