【问题标题】:Flink InvalidTypesException: Type of TypeVariable 'K' in 'class' could not be determinedFlink InvalidTypesException:无法确定“类”中类型变量“K”的类型
【发布时间】:2015-11-19 21:22:04
【问题描述】:

Flink 0.10.0 最近刚刚发布。我需要从 0.9.1 迁移一些代码。但出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException:无法确定“class fi.aalto.dmg.frame.FlinkPairWorkloadOperator”中类型变量“K”的类型。这很可能是类型擦除问题。类型提取目前仅在返回类型中的所有变量都可以从输入类型推导出来的情况下支持具有泛型变量的类型。

代码如下:

 public class FlinkPairWorkloadOperator<K,V> implements PairWorkloadOperator<K,V> {

    private DataStream<Tuple2<K, V>> dataStream;

    public FlinkPairWorkloadOperator(DataStream<Tuple2<K, V>> dataStream1) {
        this.dataStream = dataStream1;
    }



    public FlinkGroupedWorkloadOperator<K, V> groupByKey() {
        KeyedStream<Tuple2<K, V>, K> keyedStream = this.dataStream.keyBy(new KeySelector<Tuple2<K, V>, K>() {
            @Override
            public K getKey(Tuple2<K, V> value) throws Exception {
                return value._1();
            }
        });
        return new FlinkGroupedWorkloadOperator<>(keyedStream);
    }
}

要了解 InvalidTypesException 是如何发生的,我有另一个示例也抛出此异常,但我对此一无所知。在这个演示中,程序使用 scala.Tuple2,但不使用 flink Tuple2。

public class StreamingWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter());

        DataStream<Tuple2<String, Integer>> pairs = mapToPair(counts, mapToStringIntegerPair);
        pairs.print();
        env.execute("Socket Stream WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, String> {
        @Override
        public void flatMap(String sentence, Collector<String> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(word);
            }
        }
    }

    public static  <K,V,T> DataStream<Tuple2<K,V>> mapToPair(DataStream<T> dataStream , final MapPairFunction<T, K, V> fun){
        return dataStream.map(new MapFunction<T, Tuple2<K, V>>() {
            @Override
            public Tuple2<K, V> map(T t) throws Exception {
                return fun.mapPair(t);
            }
        });
    }

   public interface MapPairFunction<T, K, V> extends Serializable {
     Tuple2<K,V> mapPair(T t);
  }

  public static MapPairFunction<String, String, Integer> mapToStringIntegerPair = new MapPairFunction<String, String, Integer>() {
       public Tuple2<String, Integer> mapPair(String s) {
            return new Tuple2<String, Integer>(s, 1);
        }
    };
}

【问题讨论】:

标签: apache-flink flink-streaming


【解决方案1】:

问题是您使用scala.Tuple2 而不是org.apache.flink.api.java.tuple.Tuple2 与Flink 的Java API 结合使用。 Java API 的TypeExtractor 不理解 Scala 元组。因此无法提取类型变量K的类型。

如果您改用org.apache.flink.api.java.tuple.Tuple2,那么TypeExtractor 将能够解析类型变量。

【讨论】:

  • 感谢您的帮助。如果我使用 scala.Tuple2,有没有办法解决这个问题?比如使用ResultTypeQueryable?因为我有一个高级 API 也需要在 Spark 中实现,它需要 scala.Tuple2。
  • 嗨,我刚刚更新了我的问题。我有一个演示它适用于 scala.Tuple2,但使用 flink Tuple2 得到相同的异常。
  • 为什么不直接使用 Flink 的 Scala API 呢?有了它,您可以轻松处理scala.Tuples。即使使用ResultTypeQueryable,您也无法解决Java API 中的问题,因为您必须找到无法从GenericTypeInfo[scala.Tuple2] 获得的TypeInformation[K]。您必须将TypeInformation[K] 显式传递给FlinkPairWorkloadOperator
  • 供您观察。您的第二个程序与scala.Tuples 一起使用,因为在您的程序中没有任何时候您可以访问这些元素。因此,不需要找出元组字段类型。它不适用于org.apache.flink.api.java.tuple.Tuple2,因为对于这种类型,Flink 试图找出元组字段类型,即KV。正如异常所说“无法确定 ... 中的 TypeVariable 'K' 的类型 .... 类型提取目前仅在返回类型中的所有变量都可以从输入类型推导出的情况下支持具有泛型变量的类型( s)"
  • KV 不是输入类型。它们来源于MapPairFunctionfun,Flink 无法解析。
猜你喜欢
  • 2015-01-18
  • 1970-01-01
  • 1970-01-01
  • 2016-06-10
  • 2016-03-29
  • 1970-01-01
  • 1970-01-01
  • 2016-02-29
相关资源
最近更新 更多