【问题标题】:How to convert my java complex object into spark dataframe如何将我的 java 复杂对象转换为 spark 数据框
【发布时间】:2020-12-29 18:10:47
【问题描述】:

我正在使用 java spark,下面是我的代码

JavaRDD<MyComplexEntity> myObjectJavaRDD = resultJavaRDD.flatMap(result -> result.getMyObjects());

DataFrame df = sqlContext.createDataFrame(myObjectJavaRDD, MyComplexEntity.class);

df.saveAsParquetFile("s3a://mybucket/test.parquet");

MyComplexEntity.java

public MyComplexEntity implements Serializable {
     private Identifier identifier;
     private boolean isSwitch1True;
     private String note;
     private java.util.ArrayList<Identifier> secodaryIds;
     ......
} 

标识符.java

public Identifier implements Serializable {
     private int id;
     private String uuid;
     ......
}

问题是我在从 myObjectJavaRDD 创建数据帧时在第 2 步失败。如何将复杂的 java 对象列表转换为数据框。谢谢

【问题讨论】:

    标签: java dataframe apache-spark


    【解决方案1】:

    无论如何,您可以将其转换为 Scala 吗?

    Scala 支持case class 处理这种情况

    对于您的案例,挑战是您有一个 Seq/ArrayInner 案例类 => private java.util.ArrayList&lt;Identifier&gt; secodaryIds;

    所以可以通过以下方式完成

    // inner case class Identifier
    case class Identifier(Id : Integer , uuid : String)
    val innerVal = Seq(Identifier(1,"gsgsg"),Identifier(2,"dvggwgwg"))
    
    // Outer case class MyComplexEntity
    case class MyComplexEntity(notes : String, identifierArray : Seq[Identifier])
    val outerVal = MyComplexEntity("Hello", innerVal)
    

    请注意=>

    outerVal : MyComplexEntity 包含一个 LIST 标识符对象,如下所示

    outerVal: MyComplexEntity = MyComplexEntity(Hello,List(Identifier(1,gsgsg), Identifier(2,dvggwgwg)))

    现在是使用 Dataset

    的实际 spark 方式
    import spark.implicits._
    // Convert Our Input Data in Same Structure as your MyComplexEntity
    // Only Trick is To 'Reflect' A Seq[(Int,String)] => Seq[Identifier]
    // Hence we have to do 2 Mapping once for Outer Case class (MyComplexEntity) And Once For Inner Seq of Identifier
    // If We Just Take this Input Data and Convert To DataSet ( without any Schema Inference)
    // This is How It looks 
    
    val inputData = Seq(("Some DAY",Seq((210,"wert67"),(310,"bill123"))),
                        ("I WILL BE", Seq((420,"henry678"),(1000,"baba123"))),
                        ("Saturday Night",Seq((1000,"Roger123"),(2000,"God345")))
                        )
                        
    val unMappedDs = inputData.toDS
    
    

    给我们=>

    // See how it is Infered
    // unMappedDs: org.apache.spark.sql.Dataset[(String, Seq[(Int, String)])] = [_1: string, _2: array<struct<_1:int,_2:string>>]
    

    但如果我们'正确'映射它 =>

    作为 => // Second element is a Seq[(Int,String)] and We map it into Seq[Identifier] as x._2.map(y =&gt; Identifier(y._1,y._2))

    如下:

    val resultDs = inputData.toDS.map(x =>MyComplexEntity(x._1,x._2.map(y => Identifier(y._1,y._2))))
    resultDs.show(20,false)
    

    我们得到一个类似 =>

    的结构

    resultDs: org.apache.spark.sql.Dataset[MyComplexEntity] = [notes: string, identifierArray: array&lt;struct&lt;Id:int,uuid:string&gt;&gt;]

    数据为:

    +--------------+--------------------------------+
    |notes         |identifierArray                 |
    +--------------+--------------------------------+
    |Some DAY      |[[210,wert67], [310,bill123]]   |
    |I WILL BE     |[[420,henry678], [1000,baba123]]|
    |Saturday Night|[[1000,Roger123], [2000,God345]]|
    +--------------+--------------------------------+
    

    使用 Scala 就这么简单。 谢谢。

    【讨论】:

    • 非常感谢您的详细回答,@SanBan。但是,我无法选择走哪条路。我正在构建由 Java 编写的遗留代码之上。无论如何我可以使用java实现这一点?还是据您所知没有办法?再次感谢。
    • 只需在 Java 语法中使用 map(x =&gt;MyComplexEntity(x._1,x._2.map(y =&gt; Identifier(y._1,y._2))))。如果您喜欢答案,请点赞:-)
    猜你喜欢
    • 1970-01-01
    • 2015-06-05
    • 2023-01-26
    • 1970-01-01
    • 1970-01-01
    • 2022-01-21
    • 1970-01-01
    • 2018-01-12
    • 1970-01-01
    相关资源
    最近更新 更多