【问题标题】:Apache Flink - can't compile KeyedOneInputStreamOperatorTestHarness with Int or Long generic parametersApache Flink - 无法使用 Int 或 Long 泛型参数编译 KeyedOneInputStreamOperatorTestHarness
【发布时间】:2021-12-31 14:44:57
【问题描述】:

当我想使用带有 Int 或 Long 作为键 (K) 参数的 KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] 时,下面提到的 Apache Flink 测试代码无法在 Scala 中编译。可以使用 STRING、INTEGER 或 java.lang.Long 类型的泛型参数编译相同的代码:

import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test

case class TaxiRide(rideId: Int)

class RideKeySelector extends KeySelector[TaxiRide, Int] {
  override def getKey(in: TaxiRide): Int = in.rideId
}

class RideTests {
  private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
    val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)

    val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

    testHarness.setup()
    testHarness.open()

    testHarness
  }
}

构建错误:

type mismatch;
 found   : org.example.RideKeySelector
 required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
      new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)

我从以下 Maven 原型创建了项目:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-quickstart-scala</artifactId>
    <version>1.14.2</version>
</dependency>

我在 IntelliJ IDEA 的 pom.xml 中添加了以下测试依赖项:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-test-utils_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.14.2</version>    
        <scope>test</scope>    
        <classifier>tests</classifier>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.14.2</version>
        <scope>test</scope>
        <classifier>tests</classifier>
    </dependency>
</dependencies>

我尝试用 lambda 替换键选择器:

val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

构建错误变为:

overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
  (x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
 cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
     testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)

好像不能接受Scala类型,为什么会这样?

【问题讨论】:

    标签: scala apache-flink


    【解决方案1】:

    参数化类型与一个名为 type variance 的概念相关联。

    如果您有一个带有类型参数AWrapper,那么您的Wrapper 可以是invariantcovariantcontravariant,分别对应于类型A

    invariant :: 类型 Wrapper[A]Wrapper[B] 根本不关心 AB 之间的类型关系。 Wrapper[A]Wrapper[B] 之间根本没有关系。

    covariant :: 给定一个类型B,它是另一个类型A的子类型(读作B extends A),那么Wrapper[B]也将是Wrapper[A]的子类型(读作@ 987654342@).

    contravariant :: 给定一个类型B,它是另一个类型A的子类型(读作B extends A),那么Wrapper[B]将是Wrapper[A]的超类型(读作@987654349 @)。

    Java 仅允许 invariant 类型参数,而 Scala 允许您根据需要选择 variance,使用 Wrapper[A] 代表 invariantWrapper[+A] 代表 covariantWrapper[-A] 代表 contravariant 关系。

    因此,对于在 Java 中实现的任何参数化类型 Wrapper[T]Wrapper[A]Wrapper[B] 将没有任何关系,无论 AB 之间是否存在任何关系。

    由于KeyedOneInputStreamOperatorTestHarness是用Java实现的,所以RideKeySelectorKeySelector[TaxiRide, Int]之间的关系是invariant

    虽然在许多情况下,当您在 covariant 处处理 invariant 类型参数时,您无需担心 variance,但事情只是巧合地工作,但在处理 invariant 类型时它们几乎从不工作contravariant 处的参数。

    但是当它起作用时,你必须记住,在任何contravariant 地方都没有使用该类型只是一个巧合,你不应该依赖它工作,因为开发人员从来没有打算这样做。在未来的版本中,它可能会通过在类中额外添加一个 contravariant 来改变。

    这里就是这样,KeyedOneInputStreamOperatorTestHarness&lt;K, IN, OUT&gt; 的第二个类型参数In 自然地用在contravariantKeyedOneInputStreamOperatorTestHarness 中的地方。

    至于重载方法错误,那是因为KeyedOneInputStreamOperatorTestHarness&lt;K, IN, OUT&gt;的构造函数想要一个KeySelector&lt;IN, K&gt;。因此KeyedOneInputStreamOperatorTestHarness&lt;Int, TaxiRide, Int&gt; 需要KeySelector&lt;TaxiRide, Int&gt; 而不是scala.Function1[TaxiRide, Int],因为您在下一行中提供了它,

    val testHarness = 
      new KeyedOneInputStreamOperatorTestHarness(
        operator,
       (ride: TaxiRide) => ride.rideId,
       Types.INT
    )
    

    现在,您只需要提供适当的实例。

    val rideKeySelector = new KeySelector[TaxiRide, Int] {
      @throws(classOf[Exception])
      override def getKey(value: TaxiRide): Int = value.rideId
    }
    
    val testHarness = 
      new KeyedOneInputStreamOperatorTestHarness(
        operator,
       rideKeySelector,
       Types.INT
    )
    

    我还建议您在处理类型密集型 Java 库时干脆放弃使用Int(而干脆使用java.lang.Integer)。 Scala 的 Int 与 Java 的 intInteger 之间的交互错综复杂,足以写出一本完整的书。

    【讨论】:

    • 你最后的代码 sn-p 也没有编译,它实际上与我的类 RideKeySelector 非常相似。谢谢。
    • 我不知道这个KeyedOneInputStreamOperatorTestHarness 来自您的代码中的哪个依赖项。它不存在于您提供的 4 个依赖项中的任何一个中。此外,这应该为您提供有关如何解决您所描述的具体问题的更多背景信息。
    • KeyedOneInputStreamOperatorTestHarness 来自flink-streaming-java_2.11。其他未提及的部分来自原型flink-quickstart-scala。根据您的精彩描述,有问题的部分可能是 TypeInformation&lt;Integer&gt; intType = Types.INT; 我希望在 flink-streaming-scala_2.11 或其他地方应该有一个 Scala 对应的 KeyedOneInputStreamOperatorTestHarness
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-10
    • 1970-01-01
    • 1970-01-01
    • 2017-01-28
    • 1970-01-01
    相关资源
    最近更新 更多