【发布时间】:2017-04-21 16:13:02
【问题描述】:
我对 Spark 和 Elasticsearch 还是很陌生。
目前我正在尝试从 elasticsearch 导入数据。
val conf = new SparkConf().setAppName("test")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.nodes", "localhost:9200")
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("org.elasticsearch.spark.sql").load("test-*/Things")
df.registerTempTable("tmpT")
df.printSchema
如上面的代码, 数据框 df 的架构是这样的
root
|-- @timestamp: timestamp (nullable = true)
|-- @version: string (nullable = true)
|-- eventJ: struct (nullable = true)
| |-- action: string (nullable = true)
| |-- code: string (nullable = true)
| |-- process_id: string (nullable = true)
| |-- system_id: string (nullable = true)
| |-- timestamp: string (nullable = true)
| |-- type: string (nullable = true)
| |-- ua_type: string (nullable = true)
| |-- uc_type: string (nullable = true)
| |-- ud_type: string (nullable = true)
| |-- um_type: string (nullable = true)
| |-- us_type: string (nullable = true)
|-- headerJ: struct (nullable = true)
| |-- endpointKeyHash: struct (nullable = true)
| | |-- string: string (nullable = true)
| |-- timestamp: struct (nullable = true)
| | |-- long: long (nullable = true)
|-- type: string (nullable = true)
并查看数据框中的数据 我这样做
df.show()
+--------------------+--------+--------------------+--------------------+---------+
| @timestamp|@version| eventJ| headerJ| type|
+--------------------+--------+--------------------+--------------------+---------+
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:21:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:22:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:23:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:20:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
|2017-04-17 19:25:...| 1|[ACCEPT from from...|[[zxVuxVd6+3iOc4r...|ThingSPIN|
+--------------------+--------+--------------------+--------------------+---------+
如果我这样做了
df.toJSON.foreach(a => println(a))
它显示了这一点。我认为它只是将结构化的 json 数据转换为字符串。
{"@timestamp":"2017-04-20T08:58:53.189+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.112:17500 to 172.16.1.255:17500 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646333189}},"type":"Things"}
{"@timestamp":"2017-04-20T08:59:51.434+09:00","@version":"1","eventJ":{"action":"ACCEPT from from 172.16.1.1:38329 to 239.255.255.250:1900 UDP","code":"1702","process_id":"1","system_id":"ESG","timestamp":"0000","type":"2","ua_type":"4","uc_type":"5","ud_type":"7","um_type":"3","us_type":"6"},"headerJ":{"endpointKeyHash":{"string":"zxVuxVd6+3iOc4raxk2yezQem3U="},"timestamp":{"long":1492646391435}},"type":"Things"}
我想要做的是将数据帧(df)转换为某种格式,其中每列中的json数据拼接可能如下所示
action code process_id system_id ...
--------------------------------------------------------
ACCEPT... 1702 1 ESG ...
ACCEPT... 1702 1 ESG ...
如何将从 elasticsearch 读取的数据帧转换为像上面这样的正确数据格式,以便在 sparkSQL 中进行 sql 查询? 任何想法都非常受欢迎。提前致谢。
【问题讨论】:
-
我不确定我是否理解你的问题。您是否需要将数据框格式化为 HTML 表格?还是要将列
eventJ中的json 对象提取到具有类似于该HTML 表的架构的DataFrame? -
我只想提取 eventJ 并将这些字段拆分为数据库列。
标签: apache-spark apache-spark-sql spark-dataframe