【问题标题】:How to support multiple KeyBy in FlinkFlink 中如何支持多个 KeyBy
【发布时间】:2018-02-27 23:07:51
【问题描述】:

在下面的代码示例中,我试图获取员工记录流{ Country, Employer, Name, Salary, Age },并在每个国家/地区倾销最高薪员工。不幸的是,Multiple KEY By 不起作用。

只有 KeyBy(Employer) 正在反映,因此我没有得到正确的结果。 我错过了什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Employee> streamEmployee = env.addSource(
        new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
        .map(new MapFunction<ObjectNode, Employee>() {

            private static final long serialVersionUID = 6111226274068863916L;

            @Override
            public Employee map(ObjectNode value) throws Exception {
                final Gson gson = new GsonBuilder().create();
                Employee uMsg = gson.fromJson(value.toString(), Employee.class);
                return uMsg;
            }
        });

KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
        .keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getCountry();
            }
        }).keyBy(new KeySelector<Employee, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String getKey(Employee value) throws Exception {
                // TODO Auto-generated method stub
                return value.getEmployer();
            }
        });
// This should display employees highly paid in a given country , for a
// given employer
DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
        .maxBy("salary");

// Assume toString() is overridden , so print works well.
uHighlyPaidEmployee.print();

env.execute("Employee-employer log processor");

【问题讨论】:

    标签: java apache-kafka apache-flink


    【解决方案1】:

    如果您尝试用 lambda 表达式替换代码,您将遇到此处描述的问题:https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html

    【讨论】:

      【解决方案2】:

      您可以定义一个返回复合键的KeySelector

      KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
        streamEmployee.keyBy(
          new KeySelector<Employee, Tuple2<String, String>>() {
      
            @Override
            public Tuple2<String, String> getKey(Employee value) throws Exception {
              return Tuple2.of(value.getCountry(), value.getEmployer());
            }
          }
        );
      

      【讨论】:

      • 这个也能正常工作。 KeyedStream employeesKeyedByCountryndEmployer = streamEmployee.keyBy("country","Employer");谢谢你,你让我开心。
      • 我们如何使用 queryable-client 查询多个 keyby?
      • 嗨,Fabian,你能帮我解决这个问题吗? stackoverflow.com/questions/61922101/…
      猜你喜欢
      • 2022-12-02
      • 1970-01-01
      • 1970-01-01
      • 2020-12-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多