【问题标题】:Using Java Spark Load Existing Mongodb to Hive使用 Java Spark 将现有的 Mongodb 加载到 Hive
【发布时间】:2020-06-21 23:57:18
【问题描述】:

目标

我正在使用 Spark (2.3.1) 和 Java 处理 ETL Mongodb to Hive

我在哪里 RN

我可以加载现有的 Mongodb 并显示/查询数据

问题

但我无法将其保存到 hive 表中。

MongoDB数据结构

当前的 mongodb 数据是复杂的嵌套 dict(结构类型),有没有办法进行转换以更轻松地保存在 hive 中?

public static void main(final String[] args) throws InterruptedException {
    // spark session read mongodb
    SparkSession mongo_spark = SparkSession.builder()
            .master("local")
            .appName("MongoSparkConnectorIntro")
            .config("mongo_spark.master", "local")
            .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
            .config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
            .enableHiveSupport()
            .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

    // Load data and infer schema, disregard toDF() name as it returns Dataset
    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
    implicitDS.printSchema();
    implicitDS.show();

    // createOrReplaceTempView
    implicitDS.createOrReplaceTempView("my_table");
    // mongo_spark.sql("DROP TABLE IF EXISTS my_table");
    // cannot save table this step
    // implicitDS.write().saveAsTable("my_table");
    // can query the temp view
    mongo_spark.sql("SELECT * FROM my_table limit 1").show();

    // More application logic would go here...
    JavaMongoRDD<Document> rdd = MongoSpark.load(jsc);
    System.out.println(rdd.count());
    System.out.println(rdd.first().toJson());

    jsc.close();
}

有没有人有在 Java 中做这个 ETL spark 工作的经验? 我真的很感激。

【问题讨论】:

  • 你为什么使用MongoSpark.load(jsc)两次?为什么输入和输出URI是同一个集合?蜂巢有什么问题? Hive 还支持嵌套数据
  • 一些代码只是为了我的测试,看看我是否可以读取数据。你知道我应该怎么做才能从 mongodb 中保存 hive 表吗?
  • sqlContext.saveAsTable()?
  • 有没有办法使用现有的 mongodb 模式创建一个 hive 表?
  • CREATE TABLE name AS SELECT fields FROM mongoTable ... ?

标签: java apache-spark hadoop hive


【解决方案1】:

随着工作的深入,我意识到这是一个广泛的问题。这个问题的准确答案是

public static void main(final String[] args) throws InterruptedException {
    // spark session read mongodb
    SparkSession mongo_spark = SparkSession.builder()
            .master("local")
            .appName("MongoSparkConnectorIntro")
            .config("mongo_spark.master", "local")
            .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test_db.test_collection")
            .config("spark.mongodb.output.uri", "mongodb://localhost:27017/test_db.test_collection")
            .enableHiveSupport()
            .getOrCreate();

    // Create a JavaSparkContext using the SparkSession's SparkContext object
    JavaSparkContext jsc = new JavaSparkContext(mongo_spark.sparkContext());

    // Load data and infer schema, disregard toDF() name as it returns Dataset
    Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
    implicitDS.printSchema();
    implicitDS.show();

    // createOrReplaceTempView
    implicitDS.createOrReplaceTempView("my_table");
    mongo_spark.sql("DROP TABLE IF EXISTS my_table");
    implicitDS.write().saveAsTable("my_table");

    jsc.close();
}

所以实际上代码正在运行,但让我感到困惑的是我的数据中发生了一些事情

  1. 单个字段的数据类型冲突(com.mongodb.spark.exceptions.MongoTypeConversionException:无法转换...) - 这可以解决在加载时增加样本大小,检查 java 语法 How to config Java Spark sparksession samplesize

  2. 嵌套结构中的 nulltype - 我仍在寻找 Java 中的解决方案

由于我获得了 scala 代码示例的大量研究,我会尽力记录我的发现,希望它可以节省您一天的时间

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-17
    • 2021-09-20
    • 2016-07-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-23
    相关资源
    最近更新 更多