【问题标题】:Loading a denormalized Hive table into Elasticsearch with Spark使用 Spark 将非规范化 Hive 表加载到 Elasticsearch
【发布时间】:2017-07-23 16:39:00
【问题描述】:

所以,我找到了很多相反的答案,但不是这个。现在听起来很傻,因为 Elasticsearch 纯粹是处理非规范化数据,但这就是我们遇到的问题。我们有一个格式如下的表格:

+----+--------+--------+--------+--------+---------+
| id | attr_1 | attr_2 | attr_3 | attr_4 | fst_nm  |
+----+--------+--------+--------+--------+---------+
|  1 |   2984 |   0324 |  38432 |        | john    |
|  2 |   2343 |  28347 | 238493 |  34923 | patrick |
|  3 |   3293 |   3823 |  38423 |  34823 | george  |
+----+--------+--------+--------+--------+---------+

attr_x 代表相同的东西,假设它们是另一个表的外键,当该表在规范化世界中分离时。因此,所有attrs 都存在于一个单独的表中。然而,这些桌子被去规范化,它们都被倾倒在一张长桌子上。通常,加载到 Elasticsearch 中并不是什么大问题,但是这个表很大,大约有 1000 多列。我们希望将这些attrs 存储为一个数组在 Elasticsearch 中,如下所示:

_source: {
  "id": 1,
  "fst_nm": "john",
  "attrs": [
    2984,
    0324,
    38432
  ]
}

代替:

_source: {
  "id": 1,
  "fst_nm": "john",
  "attr_1": 2984,
  "attr_2": 0324,
  "attr_3": 38432
}

当我们使用默认的 Spark 进程时,它只会创建底部的 Elasticsearch 文档。我有几个想法是创建一个attrs 的新表并取消透视它们,然后按 ID 查询该表,以获取属性,所以它看起来像这样:

+-----+--------+
| id  |  attr  |
+-----+--------+
|   1 |   2984 |
|   1 |   0324 |
|   1 |  38432 |
|   2 |   2343 |
| ... |    ... |
|   3 |  34823 |
+-----+--------+

然后我们可以使用 Spark SQL 在这个新创建的表上按 id 查询,获取 attrs,但是我们如何使用 Spark 将其作为数组插入到 Elasticsearch 中?

我的另一个想法是在 Hive 中创建一个新表,并将 attrs 更改为 Hive 复杂类型的数组,但我不知道该怎么做。另外,如果我们使用 Spark 查询 Hive 中的表,当结果以数组形式返回时,是否可以轻松转储到 Elasticsearch 中?

【问题讨论】:

    标签: scala apache-spark elasticsearch hive apache-spark-sql


    【解决方案1】:

    至于数据转换部分,可以使用array将几列集合为一个数组,然后可以使用.write.json("jsonfile")写入json文件:

    import org.apache.spark.sql.functions.col
    val attrs = df.columns.filter(_.startsWith("attr")).map(col(_))
    
    val df_array = df.withColumn("attrs", array(attrs:_*)).select("id", "fst_nm", "attrs")
    
    df_array.toJSON.collect
    //res8: Array[String] = Array({"id":1,"fst_nm":"john","attrs":[2984,324,38432,null]}, {"id":2,"fst_nm":"patrick","attrs":[2343,28347,238493,34923]})
    

    写入文件:

    df_array.write.json("/PATH/TO/jsonfile")
    

    【讨论】:

    • 这看起来像我想要的,但是我在col 上找不到符号。这是为旧版本的 Scala/Spark 设计的吗?
    • col是sql函数,需要导入,看更新。
    猜你喜欢
    • 1970-01-01
    • 2011-03-27
    • 2017-02-09
    • 1970-01-01
    • 1970-01-01
    • 2016-07-15
    • 1970-01-01
    • 2023-03-18
    • 1970-01-01
    相关资源
    最近更新 更多