【问题标题】:How to read AvroFile into Tuple Class with Java in Flink如何在 Flink 中使用 Java 将 AvroFile 读入 Tuple 类
【发布时间】:2018-12-18 09:03:56
【问题描述】:

我正在尝试读取 Avro 文件并对其执行一些操作,一切正常,但聚合函数,当我使用它们时,它得到以下异常:

aggregating on field positions is only possible on tuple data types

然后我更改我的类以实现 Tuple4(因为我有 4 个字段)但是当我想收集结果时得到 AvroTypeException Unknown Type : T0

这是我的数据和工作类别:

public class Nation{

public Integer N_NATIONKEY;
public String N_NAME;
public Integer N_REGIONKEY;
public String N_COMMENT;

public Integer getN_NATIONKEY() {
    return N_NATIONKEY;
}

public void setN_NATIONKEY(Integer n_NATIONKEY) {
    N_NATIONKEY = n_NATIONKEY;
}

public String getN_NAME() {
    return N_NAME;
}

public void setN_NAME(String n_NAME) {
    N_NAME = n_NAME;
}

public Integer getN_REGIONKEY() {
    return N_REGIONKEY;
}

public void setN_REGIONKEY(Integer n_REGIONKEY) {
    N_REGIONKEY = n_REGIONKEY;
}

public String getN_COMMENT() {
    return N_COMMENT;
}

public void setN_COMMENT(String n_COMMENT) {
    N_COMMENT = n_COMMENT;
}
public Nation() {
}


public static void main(String[] args) throws Exception {
    Configuration parameters = new Configuration();

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();


    Path path2 = new Path("/Users/violet/Desktop/nation.avro");

    AvroInputFormat<Nation> format = new AvroInputFormat<Nation>(path2,Nation.class);
    format.configure(parameters);
    DataSet<Nation> nation = env.createInput(format);
    nation.aggregate(Aggregations.SUM,0);


    JobExecutionResult res = env.execute();
}

这里是元组类和与上面相同的工作代码:

public class NationTuple extends Tuple4<Integer,String,Integer,String> {

Integer N_NATIONKEY(){ return this.f0;}
String N_NAME(){return this.f1;}
Integer N_REGIONKEY(){ return this.f2;}
String N_COMMENT(){ return this.f3;}

}

我尝试了这个类并得到了 TypeException(到处都使用 NationTuple 而不是 Nation)

【问题讨论】:

    标签: java apache-flink avro


    【解决方案1】:

    我不认为让你的类实现 Tuple4 是正确的方法。相反,您应该在拓扑中添加一个 MapFunction,将您的 NationTuple 转换为 Tuple4。

    static Tuple4<Integer, String, Integer, String> toTuple(Nation nation) {
      return Tuple4.of(nation.N_NATIONKEY, ...);
    }
    

    然后在你的拓扑调用中:

    inputData.map(p -> toTuple(p)).returns(new TypeHint<Tuple4<Integer, String, Integer, String>(){});
    

    唯一微妙的部分是您需要提供类型提示,以便 flink 可以确定您的函数返回的元组类型。

    另一种解决方案是在进行聚合时使用字段名称而不是元组字段索引。例如:

    groupBy("N_NATIONKEY", "N_REGIONKEY")
    

    这里都有解释:https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#specifying-keys

    【讨论】:

    • 非常感谢,我对此完全陌生,对此一无所知
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-02-11
    • 2017-07-25
    • 1970-01-01
    • 1970-01-01
    • 2019-07-10
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多