【问题标题】:How to merge two parquet files having different schema in spark (java)如何在火花(java)中合并具有不同模式的两个镶木地板文件
【发布时间】:2021-10-26 13:33:51
【问题描述】:

我有 2 个列数不同的镶木地板文件,并尝试将它们与以下代码 sn-p 合并

Dataset<Row> dataSetParquet1 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\ABC\\abc.parquet");
              
Dataset<Row> dataSetParquet2 = testSparkSession.read().option("mergeSchema",true).parquet("D:\\EFG\\efg.parquet");                    
  
dataSetParquet1.unionByName(dataSetParquet2);
// dataSetParquet1.union(dataSetParquet2);


对于unionByName() 我得到了错误:

Caused by: org.apache.spark.sql.AnalysisException: Cannot resolve column name

对于union(),我得到了错误:

Caused by: org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the first table has 7 columns and the second table has 6 columns;;

如何在 java 中使用 spark 合并这些文件?

更新:示例

数据集 1:

epochMillis   | one | two | three| four
--------------------------------------
1630670242000 | 1   | 2   | 3    | 4
1630670244000 | 1   | 2   | 3    | 4
1630670246000 | 1   | 2   | 3    | 4

数据集2:

epochMillis   | one | two | three|five
---------------------------------------
1630670242000 | 11  | 22  | 33   | 55
1630670244000 | 11  | 22  | 33   | 55
1630670248000 | 11  | 22  | 33   | 55

合并后的最终数据集:

epochMillis   | one | two | three|four |five
--------------------------------------------
1630670242000 | 11  | 22  | 33   |4    |55
1630670244000 | 11  | 22  | 33   |4    |55
1630670246000 | 1   | 2   | 3    |4    |null
1630670248000 | 11  | 22  | 33   |null |55

如何获得合并两个数据集的结果?

【问题讨论】:

    标签: java apache-spark apache-spark-sql


    【解决方案1】:

    要合并来自两个不同数据框的两行,首先连接两个数据框,然后根据要合并的方式选择正确的列。

    所以对于你的情况,这意味着:

    • 从拼花位置分别读取两个数据帧
    • 在它们的 epochTime 列上加入两个数据帧,使用 full_outer 连接,因为您希望所有行都存在于一个数据帧中,但不存在于另一个数据帧中
    • 从复制了两个数据帧的所有列的新数据帧中,使用函数columnMerges(以下实现)选择合并的列
    • [可选] 按epochTime重新排序最终数据帧

    翻译成代码:

    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    Dataset<Row> dataframe1 = testSparkSession.read().parquet("D:\\ABC\\abc.parquet");
    Dataset<Row> dataframe2 = testSparkSession.read().parquet("D:\\EFG\\efg.parquet");
    
    dataframe1.join(dataframe2, dataframe1.col("epochTime").equalTo(dataframe2.col("epochTime")), "full_outer")
      .select(Selector.columnMerges(dataframe2, dataframe1))
      .orderBy("epochTime")
    

    注意:当我们读取 parquets 时,不需要 mergeSchema 选项,因为对于每个数据帧,我们只读取一个 parquet 文件,因此只有一个模式

    对于合并函数Selector.columnMerges,对于每一行,我们要做的是:

    • 如果两个数据帧中都存在该列,则在dataframe2 中取值,如果不为空,则在dataframe1 中取值
    • 如果该列仅存在于dataframe2 中,则取dataframe2 中的值
    • 如果该列仅存在于dataframe1 中,则取值dataframe1

    所以我们首先构建dataframe1 的列集、dataframe2 的列集以及来自两个数据帧的列列表,并进行了重复数据删除。然后我们遍历这个列列表,对每个列应用之前的规则:

    import org.apache.spark.sql.Column;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    
    import static org.apache.spark.sql.functions.when;
    
    public class Selector {
    
      public static Column[] columnMerges(Dataset<Row> main, Dataset<Row> second) {
        List<Column> columns = new ArrayList<>();
    
        Set<String> columnsFromMain = new HashSet<>(Arrays.asList(main.columns()));
        Set<String> columnsFromSecond = new HashSet<>(Arrays.asList(second.columns()));
    
        List<String> columnNames = new ArrayList<>(Arrays.asList(main.columns()));
        for (String column: second.columns()) {
          if (!columnsFromMain.contains(column)) {
            columnNames.add(column);
          }
        }
    
        for (String column : columnNames) {
          if (columnsFromMain.contains(column) && columnsFromSecond.contains(column)) {
            columns.add(when(main.col(column).isNull(), second.col(column)).otherwise(main.col(column)).as(column));
          } else if (columnsFromMain.contains(column)) {
            columns.add(main.col(column).as(column));
          } else {
            columns.add(second.col(column).as(column));
          }
        }
    
        return columns.toArray(new Column[0]);
      }
    }
    

    【讨论】:

      【解决方案2】:

      您可以使用mergeSchema 选项并在parquet 方法中添加要合并的拼花文件的所有路径,如下所示:

      Dataset<Row> finalDataset = testSparkSession.read()
        .option("mergeSchema", true)
        .parquet("D:\\ABC\\abc.parquet", "D:\\EFG\\efg.parquet");
      

      存在于第一个数据集中但不在第二个数据集中的所有列将在第二个数据集中设置为null

      【讨论】:

      • 感谢@Vincent Doba。我们不能合并数据集吗? dataSetParquet1 和 dataSetParquet2?
      • 我不明白你的新问题。是否要在分别读取两个数据集后合并(在数据集 1 的末尾添加数据集 2 的行)这两个数据集?还是您想加入它们(合并具有共同 id 的 dataset1 和 dataset2 的行)?您能否添加一个带有数据集 1 样本、数据集 2 样本和最终预期输出的小示例,例如这个问题:stackoverflow.com/q/68992621/6807769
      • 我在问题部分添加了一个更新,你能检查一下吗?
      • 感谢您的示例,它确实解释了您想要做什么。请检查我的新答案stackoverflow.com/a/69049569/6807769
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-01-18
      • 1970-01-01
      • 1970-01-01
      • 2020-09-09
      • 2020-01-10
      • 2018-06-28
      • 2018-01-21
      相关资源
      最近更新 更多