【发布时间】:2018-07-09 21:48:41
【问题描述】:
我有 json 格式的源数据(简单字符串)。 示例:
{"_t":1480647647,"_p":"r@test.com","_n":"aloaded","device_type":"desktop"}
{"_t":1480647676,"_p":"tt@test.com","_n":"aloaded","device_type":"desktop"}
其中 _t 是时间戳。 1480647647 礼物 - 2016 年 12 月 2 日星期五 03:00:47
我需要将此数据加载到镶木地板文件中。 代码部分
loaded_prq_fpath - full path with file name
MessageType APPLOADED_FILE_SCHEMA = Types.buildMessage()
.required(INT64).as(TIMESTAMP_MILLIS).named("time")
.required(BINARY).as(UTF8).named("email")
.required(BINARY).as(UTF8).named("device_type")
.named("AppLoaded");
SimpleGroupFactory GROUP_FACTORY_APP_LOADED =
new SimpleGroupFactory(APPLOADED_FILE_SCHEMA);
File fp = new File(loaded_prq_fpath);
Path file1;
file1 = new Path(fp.toString());
File fp = new File(loaded_prq_fpath);
Path file1;
file1 = new Path(fp.toString());
logger.info(file1.getName());
ParquetWriter<Group> writer1 =
ExampleParquetWriter.builder(file1)
.withType(APPLOADED_FILE_SCHEMA)
.build();
...
while(jp.nextToken() == JsonToken.START_OBJECT) {
// read everything from this START_OBJECT to the matching END_OBJECT {}
// and return it as a tree model TreeNode
JsonNode node = mapper.readTree(jp);
TotalEventsCnt++;
if (node.get("_n").toString().equals("\"aloaded\"")) {
LoadCounter++;
((ObjectNode) node).remove("_n");
Group group1 = GROUP_FACTORY_APP_LOADED.newGroup();
group1.add("time", node.get("_t").asLong());
group1.add("email", node.get("_p").toString());
group1.add("device_type",node.get("device_type").toString());
writer1.write(group1);
}
...
writer1.close();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local[*]")
.getOrCreate();
数据加载没有任何错误, 接下来我展示镶木地板数据
Dataset<Row> df_appl = spark.read().load(loaded_prq_fpath);
df_appl.show();
df_appl.createOrReplaceTempView("v_appl");
df_appl.printSchema();
Dataset<Row> df_v_appl = spark.sql("SELECT CAST(time AS DATE) AS the_datetime from v_appl");
df_v_appl.show();
输出是
+--------------------+------------+-----------+
| time| email|device_type|
+--------------------+------------+-----------+
|1970-01-17 22:17:...|"r@test.com"| "desktop"|
|1970-01-17 22:17:...|"t@test.com"| "desktop"|
|1970-01-17 22:59:...|"a@test.com"| "desktop"|
|1970-01-17 22:59:...|"e@test.com"| "desktop"|
+--------------------+------------+-----------+
root
|-- time: timestamp (nullable = true)
|-- email: string (nullable = true)
|-- device_type: string (nullable = true)
+------------+
|the_datetime|
+------------+
| 1970-01-17|
| 1970-01-17|
| 1970-01-17|
| 1970-01-17|
+------------+
我认为这一行的问题:
.required(INT64).as(TIMESTAMP_MILLIS).named("time")
或这里
group1.add("time", node.get("_t").asLong());
请帮忙。 (我是 Java 新手,上次 exp 8 年前)
【问题讨论】: