【问题标题】:Create new columns by maps in array column inside a Spark Dataframe通过 Spark Dataframe 内的数组列中的映射创建新列
【发布时间】:2020-03-02 19:10:10
【问题描述】:

我想转换从 AVRO 文件导入的 Spark 2.4 数据帧(其中包含来自 Google Analytics 的跟踪数据)。

架构中有趣的部分如下所示:

root
 |-- visitorId: long (nullable = true)
 |-- visitNumber: long (nullable = true)
 |-- visitId: long (nullable = true)
 |-- visitStartTime: long (nullable = true)
 |-- date: string (nullable = true)
 |-- hits: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hitNumber: long (nullable = true)
 |    |    |-- time: long (nullable = true)
 |    |    |-- hour: long (nullable = true)
 |    |    |-- minute: long (nullable = true)
 |    |    |-- customDimensions: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- index: long (nullable = true)
 |    |    |    |    |-- value: string (nullable = true)

生成的数据集应该几乎是平坦的,没有深度嵌套的结构。像hits 这样的数组应该有自己的行,这很容易通过explode 函数实现。像hits.customDimensions 这样的数组比较棘手。每个数组元素都有一个index 字段(它对应于数组位置),并且对于每个可能的值,都应该创建一个新列。最终架构应如下所示:

root
 |-- visitorId: long (nullable = true)
 |-- visitNumber: long (nullable = true)
 |-- visitId: long (nullable = true)
 |-- visitStartTime: long (nullable = true)
 |-- hit_date: string (nullable = true)
 |-- hit_hitNumber: long (nullable = true)
 |-- hit_time: long (nullable = true)
 |-- hit_hour: long (nullable = true)
 |-- hit_minute: long (nullable = true)
 |-- hit_customDimension_1: string (nullable = true)
 |-- hit_customDimension_9: string (nullable = true)

根据在数据中找到的实际索引,hit_customDimension_X 可能会更频繁地出现。

到目前为止,数据集是这样转换的:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.explode_outer;

public class Flattener {
  public static void main(String[] args) {
    String avroFiles = String.join(",", args); // @TODO: named parameters
    SparkConf conf = new SparkConf().setAppName("Simple Application").set("spark.ui.port", "8080");
    SparkSession spark = SparkSession.builder().appName("Simple Application").config(conf).getOrCreate();
    SQLContext sqlContext = spark.sqlContext();
    Dataset<Row> sessions = spark.read().format("avro").load(avroFiles).limit(1000);

    //explode the hits to more rows, remove original array
    sessions = sessions.withColumn("hit", explode_outer(col("hits"))).drop(col("hits"));

    //sample the distinct indinces
    Dataset<Row> r = result.sample(0.1).select(explode(col("hit.customDimensions"))).select(col("col.index")).distinct();
    List<Long> indices = new LinkedList<Long>();
    r.foreach(dr -> {
        indices.add(dr.getLong(0));
    });
    Iterator<Long> l = indices.iterator();
    // for each found index, extract the array element to its own column
    while (l.hasNext()) {
        Long i = l.next();
        result.withColumn("hit_customDimension" + "_" + i.toString(), array_find("hit.customDimensions", "index", i));
    }

    //TODO: move hit column up one level
}

问题是:没有这样的array_find 函数。我找到了filter function(请参阅在数组列上过滤部分),但似乎过滤行而不是数组元素。

我想可以编写一个 UDF 来执行此操作,但据我所知,它们可能会降低性能。由于我们的易失性和大型数据集(数 TB),性能非常受关注。 这些任务看起来并不少见,所以我想知道是否有一种内置的方法可以做到这一点,我只是错过了。

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    您似乎正在寻找一个 SQL 函数,该函数通过给定索引从数组中提取元素。

    这个函数已经存在于 Spark API 中,但是是一种“隐藏”,因为它不是作为一个单独的函数实现的,而是作为 apply 类中的 apply 方法实现的。请查看scaladoc

      /**
       * Extracts a value or values from a complex type.
       * The following types of extraction are supported:
       * <ul>
       * <li>Given an Array, an integer ordinal can be used to retrieve a single value.</li>
       * <li>Given a Map, a key of the correct type can be used to retrieve an individual value.</li>
       * <li>Given a Struct, a string fieldName can be used to extract that field.</li>
       * <li>Given an Array of Structs, a string fieldName can be used to extract filed
       *    of every struct in that array, and return an Array of fields.</li>
       * </ul>
       * @group expr_ops
       * @since 1.4.0
       */
      def apply(extraction: Any): Column
    

    所以,我建议用col("hit.customDimensions")(i) 替换你的array_find(...)

    result.withColumn("hit_customDimension" + "_" + i.toString(), col("hit.customDimensions")(i));
    

    [UPD]

    正如 cmets 中有效指出的那样,customDimensions 可能是一个稀疏数组,其中 index 不是一个序数,而是一个任意整数。

    在这种情况下,从一开始就将Array 转换为Map 看起来是最自然的。

    1. Array[Struct[Int, String]]转换为Map[Int, String]
    result.withColumn("hit_customDimensions_Map", 
      map_from_arrays(col("hit.customDimensions")("index"), col("hit.customDimensions")("value")))
    
    
    1. 并更改转置customDimensions 列的方式:
    result.withColumn("hit_customDimension" + "_" + i.toString(), col("hit_customDimensions_Map")(i));
    

    【讨论】:

    • 感谢您的想法。不幸的是,数组中结构的index 字段与它们在数组中的序号位置不对应(更新了问题以指出这一点)。可能存在这样的事情:[{ index: 13, value: "foo"}, { index: 20, value: "bar"}] 文档还说:“给定一个结构数组,字符串 fieldName 可用于提取该数组中每个结构的字段,并返回一个字段数组。”即使这还不够,因为我需要索引旁边的值。
    • 嗨@MatthiasHauert,我处理了您的评论,请查看。
    • 太棒了! map_from_arrays 是这里缺少的部分。由于是java,直接调用apply函数即可。此外,不选择“值”,但整个对象甚至适用于具有更多字段的结构。 result.withColumn("hit_customDimensions_Map", map_from_arrays(col("hit.customDimensions").apply("index"), col("hit.customDimensions")))
    猜你喜欢
    • 2017-12-11
    • 1970-01-01
    • 2023-02-08
    • 1970-01-01
    • 1970-01-01
    • 2022-01-15
    • 2015-07-25
    • 2021-01-26
    • 2018-09-18
    相关资源
    最近更新 更多