【发布时间】:2020-03-02 19:10:10
【问题描述】:
我想转换从 AVRO 文件导入的 Spark 2.4 数据帧(其中包含来自 Google Analytics 的跟踪数据)。
架构中有趣的部分如下所示:
root
|-- visitorId: long (nullable = true)
|-- visitNumber: long (nullable = true)
|-- visitId: long (nullable = true)
|-- visitStartTime: long (nullable = true)
|-- date: string (nullable = true)
|-- hits: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- hitNumber: long (nullable = true)
| | |-- time: long (nullable = true)
| | |-- hour: long (nullable = true)
| | |-- minute: long (nullable = true)
| | |-- customDimensions: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- index: long (nullable = true)
| | | | |-- value: string (nullable = true)
生成的数据集应该几乎是平坦的,没有深度嵌套的结构。像hits 这样的数组应该有自己的行,这很容易通过explode 函数实现。像hits.customDimensions 这样的数组比较棘手。每个数组元素都有一个index 字段(它不对应于数组位置),并且对于每个可能的值,都应该创建一个新列。最终架构应如下所示:
root
|-- visitorId: long (nullable = true)
|-- visitNumber: long (nullable = true)
|-- visitId: long (nullable = true)
|-- visitStartTime: long (nullable = true)
|-- hit_date: string (nullable = true)
|-- hit_hitNumber: long (nullable = true)
|-- hit_time: long (nullable = true)
|-- hit_hour: long (nullable = true)
|-- hit_minute: long (nullable = true)
|-- hit_customDimension_1: string (nullable = true)
|-- hit_customDimension_9: string (nullable = true)
根据在数据中找到的实际索引,hit_customDimension_X 可能会更频繁地出现。
到目前为止,数据集是这样转换的:
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.explode_outer;
public class Flattener {
public static void main(String[] args) {
String avroFiles = String.join(",", args); // @TODO: named parameters
SparkConf conf = new SparkConf().setAppName("Simple Application").set("spark.ui.port", "8080");
SparkSession spark = SparkSession.builder().appName("Simple Application").config(conf).getOrCreate();
SQLContext sqlContext = spark.sqlContext();
Dataset<Row> sessions = spark.read().format("avro").load(avroFiles).limit(1000);
//explode the hits to more rows, remove original array
sessions = sessions.withColumn("hit", explode_outer(col("hits"))).drop(col("hits"));
//sample the distinct indinces
Dataset<Row> r = result.sample(0.1).select(explode(col("hit.customDimensions"))).select(col("col.index")).distinct();
List<Long> indices = new LinkedList<Long>();
r.foreach(dr -> {
indices.add(dr.getLong(0));
});
Iterator<Long> l = indices.iterator();
// for each found index, extract the array element to its own column
while (l.hasNext()) {
Long i = l.next();
result.withColumn("hit_customDimension" + "_" + i.toString(), array_find("hit.customDimensions", "index", i));
}
//TODO: move hit column up one level
}
问题是:没有这样的array_find 函数。我找到了filter function(请参阅在数组列上过滤部分),但似乎过滤行而不是数组元素。
我想可以编写一个 UDF 来执行此操作,但据我所知,它们可能会降低性能。由于我们的易失性和大型数据集(数 TB),性能非常受关注。 这些任务看起来并不少见,所以我想知道是否有一种内置的方法可以做到这一点,我只是错过了。
【问题讨论】:
标签: java apache-spark apache-spark-sql