【问题标题】:(Elasticsearch & spark operation) How to transform json format data read from elasticsearch to column data to do sql query(Elasticsearch&spark操作)如何将elasticsearch读取的json格式数据转换为列数据做sql查询
【发布时间】:2017-04-21 16:13:02
【问题描述】:

我对 Spark 和 Elasticsearch 还是很陌生。

目前我正在尝试从 elasticsearch 导入数据。

val conf = new SparkConf().setAppName("test")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.nodes", "localhost:9200")
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("test-*/Things")
df.registerTempTable("tmpT")
df.printSchema

如上面的代码, 数据框 df 的架构是这样的

root
 |-- @timestamp: timestamp (nullable = true)
 |-- @version: string (nullable = true)
 |-- eventJ: struct (nullable = true)
 |    |-- action: string (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- process_id: string (nullable = true)
 |    |-- system_id: string (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- ua_type: string (nullable = true)
 |    |-- uc_type: string (nullable = true)
 |    |-- ud_type: string (nullable = true)
 |    |-- um_type: string (nullable = true)
 |    |-- us_type: string (nullable = true)
 |-- headerJ: struct (nullable = true)
 |    |-- endpointKeyHash: struct (nullable = true)
 |    |    |-- string: string (nullable = true)
 |    |-- timestamp: struct (nullable = true)
 |    |    |-- long: long (nullable = true)
 |-- type: string (nullable = true)

并查看数据框中的数据 我这样做

df.show()

+--------------------+--------+--------------------+--------------------+---------+
|          @timestamp|@version|              eventJ|             headerJ|     type|
+--------------------+--------+--------------------+--------------------+---------+
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:21:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...|       1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
+--------------------+--------+--------------------+--------------------+---------+

如果我这样做了

df.toJSON.foreach(a => println(a))

它显示了这一点。我认为它只是将结构化的 json 数据转换为字符串。

{"@timestamp":"2017-04-20T08:58:53.189+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.112:17500 to 172.16.1.255:17500 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646333189}},"type":"Things"}
{"@timestamp":"2017-04-20T08:59:51.434+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.1:38329 to 239.255.255.250:1900 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646391435}},"type":"Things"}

我想要做的是将数据帧(df)转换为某种格式,其中每列中的json数据拼接可能如下所示

    action      code    process_id  system_id   ...
--------------------------------------------------------
    ACCEPT...   1702    1           ESG         ...
    ACCEPT...   1702    1           ESG         ...

如何将从 elasticsearch 读取的数据帧转换为像上面这样的正确数据格式,以便在 sparkSQL 中进行 sql 查询? 任何想法都非常受欢迎。提前致谢。

【问题讨论】:

  • 我不确定我是否理解你的问题。您是否需要将数据框格式化为 HTML 表格?还是要将列eventJ 中的json 对象提取到具有类似于该HTML 表的架构的DataFrame
  • 我只想提取 eventJ 并将这些字段拆分为数据库列。

标签: apache-spark apache-spark-sql spark-dataframe


【解决方案1】:

您可以对 eventJ 结构的属性进行选择:

val eventJDf = df.select(df.col("eventJ.action"), df.col("eventJ.code"), df.col("eventJ.process_id"), df.col("eventJ.system_id") /*...*/)

【讨论】:

    猜你喜欢
    • 2017-12-10
    • 2018-04-07
    • 1970-01-01
    • 2016-06-29
    • 1970-01-01
    • 1970-01-01
    • 2019-11-22
    • 2018-10-23
    • 2020-05-11
    相关资源
    最近更新 更多