【问题标题】:how to deserialize json in scala and java如何在scala和java中反序列化json
【发布时间】:2019-01-01 23:42:40
【问题描述】:

我是 scala 的新手。我需要连接到我的数据库并从表“queue”中选择一个名为“queue_message”的列。此列包含一个 json 架构:

{"LOG_ID":"2442204","CUSTOMER_CODE":"79D3QL","CFILE_WEIGHT":"1","PROVIDER_ID":"","FILETYPE_DIRECTORYFROM":"\\FromCustomer","FILE_CHARSET":"","CFILE_FORMAT":"CSV","FILE_NAME":"1475_18032018T164840_1.csv","FILETYPE_LABEL":"Order","FILE_ID":1475,"FILEFORMAT_CODE":"","CUSTOMER_ID":1016,"FILE_MASK":"wt_cde_*-*_*.csv"}

我需要在 scala 中反序列化此列(或在 java 中作为第二个选项),然后将另一个结构序列化为 json 格式。

这是我在 scala 中的代码:

package com.orienit.spark.training.sparkexamples

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import java.sql.DriverManager
import com.microsoft.sqlserver.jdbc
import org.apache.spark.rdd.JdbcRDD
import java.sql.ResultSet




object WordCount {
  def main(args: Array[String]){

val conf = new SparkConf()
 .setAppName("my first scala App")
 .setMaster("local")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val url =  "jdbc:sqlserver://localhost:1433;user=xsxx;password=xxx;databaseName=xxx"
val df = sqlContext


.read
   .format("jdbc")
   .option("url",url)
   .option("dbtable","(select top 1 queue_message from mq..queue where queuename_id = 4 order by queue_id desc) as sq")
   .load()

   df.show()
    println( df.collectAsList())
     }
}

这些是我在 scala 项目的 maven pom.xml 中使用的依赖项:

<dependencies>


<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>2.11.0</version>
</dependency>
 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.3.0</version>
</dependency>

<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>3.8.1</version>
  <scope>test</scope>
</dependency>

这是我在 java 中的代码:

package com.orienit.spark.training.javaJdbcConnectivity;

import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class WordCount {

public static void main(String[] args) {
    // TODO Auto-generated method stub
    SparkConf conf = new SparkConf().setMaster("local").setAppName("My app");
    JavaSparkContext sc  = new JavaSparkContext(conf);

    SQLContext sqlContext = new SQLContext(sc);

    Map<String, String> options = new HashMap<String, String>();




   options.put("url", "jdbc:sqlserver://localhost:1433;user=xsxx;password=xxx;databaseName=xxx");
options.put("dbtable", "(select top 1 queue_message from mq..queue where queuename_id = 4 order by queue_id desc) as sq");

Dataset<Row> df = sqlContext.read().format("jdbc"). options(options).load();
df.show();
System.out.println(df.collectAsList());
System.out.println(df.toJSON());


    }

}

这些是我的 java 项目的依赖项

    <dependencies>
  <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.3.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

谁能帮我序列化为 json 格式并从 json 格式反序列化,或者给我任何关于这个主题的相关文件。我没有在官方 spark 文档中找到对这位操作之王有帮助的内容。

非常感谢

【问题讨论】:

标签: java json scala apache-spark


【解决方案1】:

你可以在 spark 中使用 from_json 函数。

假设 json 的模式是“模式”,那么你可以简单地这样做:

import org.apache.spark.sql.functions.from_json
df.withColumn("deserialized", from_json($"queue_message", schema)

【讨论】:

  • 感谢阿萨夫;有没有办法从查询结果中自动检测或构造模式。这是 queue_message 内容的示例: {"LOG_ID":"3102116","PFILE_MATCHINGFIELD":"Delivery_Reference","PROVIDER_ID":42,"TRACE":{"TSTATUS_CODE":"MLRCFM","TRACE_HOUR":"03 :70","TRACE_DATE":"180530"},"MATCHINGFIELD":"114146490000000058905001001","CUSTOMER_ID":1051}。你能给我一个在这种情况下声明变量模式的例子吗?我试过 val schemaString = "LOG_ID;PFILE_MATCHINGFIELD;Delivery_Reference;PROVIDER_ID;TRACE;MATCHINGFIELD;CUSTOMER_ID" 但不起作用
  • 然后 val fields = schemaString.split(";") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) 因为在 Trace 我有TStatus_Code、Trace_Date 和 Trace_Hour
  • 这里有两个问题:第一个是json配置可能不是@user3569267在所有元素中都是常量(例如,如果缺少值会发生什么,因为其中一行为空) .第二个是分析字段就像你自己从from_json做的一样。即,您将创建一个 udf,该 udf 将返回包含数据的 Row。不过这样会更麻烦,性能也会更差。
猜你喜欢
  • 2012-09-17
  • 2016-10-30
  • 1970-01-01
  • 2017-08-02
  • 2011-07-09
  • 2014-07-02
  • 2013-03-01
  • 1970-01-01
  • 2012-12-19
相关资源
最近更新 更多