【问题标题】:Push data to mongoDB using spark from hive使用 hive 中的 spark 将数据推送到 mongoDB
【发布时间】:2021-10-17 10:01:49
【问题描述】:

我想使用 sql 查询从 hive 中提取数据,将其转换为嵌套数据帧,并使用 spark 将其推送到 mongodb。 谁能提出一个有效的方法来做到这一点。

例如: 平面查询结果 --> {"columnA":123213 ,"Column3 : 23,"Column4" : null,"Column5" : "abc"}

要推送到 mongo 的嵌套记录 --> { “列A”:123213, “新列”:{ “第 3 列:23, “列 4”:空, “第 5 列”:“abc” } }

【问题讨论】:

  • 您是否正在寻找将 pyspark/scala 代码写入 1. 从 hive 读取数据 2. 执行转换以创建嵌套数据和 3. 将此数据写入 mongodb 的答案,或者您是否正在寻找答案将演示如何执行转换以创建嵌套数据?
  • @ggordon 可以从 hive 中提取平面数据,但了解如何执行该数据的转换以创建嵌套数据然后将该数据推送到 mongodb 会很有帮助

标签: sql mongodb dataframe apache-spark hive


【解决方案1】:

您可以使用 spark sql 中的map 函数来实现所需的转换,例如

df.selectExpr("ColumnA","map('Column3',Column3,'Column4',Column4,'Column5',Column5) as newcolumn")

或者您可以在创建临时视图后在 Spark 会话中运行以下命令

df.createOrReplaceTempView("my_temp_view")
sparkSession.sql("<insert sql below here>")
SELECT
    ColumnA,
    map(
        "Column3",Column3,
        "Column4",Column4,
        "Column5",Column5
    ) as newcolumn
FROM 
   my_temp_view

此外,如果这是您希望使用的唯一转换,您也可以在 hive 上运行此查询。

其他资源:

让我知道这是否适合你。

【讨论】:

  • 感谢您的回答,这似乎真的很有帮助,如果可行,会告诉您!
【解决方案2】:

对于您的 hive 数据框的嵌套级别数组,我们可以尝试以下操作:

from pyspark.sql import functions as F

df.withColumn(
  "newcolumn",
  F.struct(
    F.col("Column3").alias("Column3"),
    F.col("Column4").alias("Column4"),
    F.col("Column5").alias("Column5")
  )
)

后跟groupByF.collect_list 以创建包含在单个记录中的嵌套数组。

然后我们可以把它写到 mongo

df.write.format('com.mongodb.spark.sql.DefaultSource').mode("append").save()

【讨论】:

  • 谢谢,试试这个!
猜你喜欢
  • 2020-11-16
  • 2023-03-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-21
  • 2015-02-09
  • 2015-11-10
相关资源
最近更新 更多