【问题标题】:Java Load data into parquetJava 将数据加载到镶木地板中
【发布时间】:2018-07-09 21:48:41
【问题描述】:

我有 json 格式的源数据(简单字符串)。 示例:

{"_t":1480647647,"_p":"r@test.com","_n":"aloaded","device_type":"desktop"}
{"_t":1480647676,"_p":"tt@test.com","_n":"aloaded","device_type":"desktop"}

其中 _t 是时间戳。 1480647647 礼物 - 2016 年 12 月 2 日星期五 03:00:47

我需要将此数据加载到镶木地板文件中。 代码部分

                loaded_prq_fpath - full path with file name

                MessageType APPLOADED_FILE_SCHEMA = Types.buildMessage()
                  .required(INT64).as(TIMESTAMP_MILLIS).named("time")
                  .required(BINARY).as(UTF8).named("email")
                  .required(BINARY).as(UTF8).named("device_type")
                  .named("AppLoaded");

            SimpleGroupFactory GROUP_FACTORY_APP_LOADED =
                  new SimpleGroupFactory(APPLOADED_FILE_SCHEMA);

            File fp = new File(loaded_prq_fpath);
            Path file1;
            file1 = new Path(fp.toString());
              File fp = new File(loaded_prq_fpath);
            Path file1;
            file1 = new Path(fp.toString());
            logger.info(file1.getName());

            ParquetWriter<Group> writer1 = 
                ExampleParquetWriter.builder(file1)
                .withType(APPLOADED_FILE_SCHEMA)
                .build();
            ...
             while(jp.nextToken() == JsonToken.START_OBJECT) {
                  // read everything from this START_OBJECT to the matching END_OBJECT {}
                  // and return it as a tree model TreeNode
                  JsonNode node = mapper.readTree(jp);
                  TotalEventsCnt++;
                    if (node.get("_n").toString().equals("\"aloaded\"")) {  
                        LoadCounter++;
                        ((ObjectNode) node).remove("_n");
                         Group group1 = GROUP_FACTORY_APP_LOADED.newGroup();
                            group1.add("time",       node.get("_t").asLong());
                            group1.add("email",      node.get("_p").toString());
                            group1.add("device_type",node.get("device_type").toString());
                            writer1.write(group1);
                    } 
                  ...
                  writer1.close();
                   SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.master", "local[*]")
                .getOrCreate();

数据加载没有任何错误, 接下来我展示镶木地板数据

            Dataset<Row> df_appl = spark.read().load(loaded_prq_fpath);
            df_appl.show(); 

            df_appl.createOrReplaceTempView("v_appl");
            df_appl.printSchema();
            Dataset<Row> df_v_appl = spark.sql("SELECT CAST(time AS DATE) AS the_datetime from v_appl");
            df_v_appl.show();

输出是

+--------------------+------------+-----------+
|                time|       email|device_type|
+--------------------+------------+-----------+
|1970-01-17 22:17:...|"r@test.com"|  "desktop"|
|1970-01-17 22:17:...|"t@test.com"|  "desktop"|
|1970-01-17 22:59:...|"a@test.com"|  "desktop"|
|1970-01-17 22:59:...|"e@test.com"|  "desktop"|
+--------------------+------------+-----------+

 root
 |-- time: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- device_type: string (nullable = true)

+------------+
|the_datetime|
+------------+
|  1970-01-17|
|  1970-01-17|
|  1970-01-17|
|  1970-01-17|
+------------+   

我认为这一行的问题:

.required(INT64).as(TIMESTAMP_MILLIS).named("time")

或这里

 group1.add("time",       node.get("_t").asLong());

请帮忙。 (我是 Java 新手,上次 exp 8 年前)

【问题讨论】:

标签: java timestamp parquet


【解决方案1】:

我用它解决了我的问题:

    group1.add("time",       node.get("_t").asLong()*1000);

因为

multiply by 1000, since java is expecting milliseconds:

Look here

【讨论】:

    猜你喜欢
    • 2023-03-27
    • 1970-01-01
    • 2023-03-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-27
    相关资源
    最近更新 更多