【发布时间】:2021-12-31 14:44:57
【问题描述】:
当我想使用带有 Int 或 Long 作为键 (K) 参数的 KeyedOneInputStreamOperatorTestHarness[K, IN, OUT] 时,下面提到的 Apache Flink 测试代码无法在 Scala 中编译。可以使用 STRING、INTEGER 或 java.lang.Long 类型的泛型参数编译相同的代码:
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.operators.KeyedProcessOperator
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness
import org.apache.flink.streaming.api.scala._
import org.junit.Test
case class TaxiRide(rideId: Int)
class RideKeySelector extends KeySelector[TaxiRide, Int] {
override def getKey(in: TaxiRide): Int = in.rideId
}
class RideTests {
private def setupHarness(function: KeyedProcessFunction[Int, TaxiRide, Int]): KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] = {
val operator: KeyedProcessOperator[Int, TaxiRide, Int] = new KeyedProcessOperator(function)
val testHarness: KeyedOneInputStreamOperatorTestHarness[Int, TaxiRide, Int] =
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
testHarness.setup()
testHarness.open()
testHarness
}
}
构建错误:
type mismatch;
found : org.example.RideKeySelector
required: org.apache.flink.api.java.functions.KeySelector[org.example.TaxiRide,Int]
new KeyedOneInputStreamOperatorTestHarness(operator, new RideKeySelector(), Types.INT)
我从以下 Maven 原型创建了项目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-quickstart-scala</artifactId>
<version>1.14.2</version>
</dependency>
我在 IntelliJ IDEA 的 pom.xml 中添加了以下测试依赖项:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.14.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.2</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
</dependencies>
我尝试用 lambda 替换键选择器:
val testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
构建错误变为:
overloaded method constructor KeyedOneInputStreamOperatorTestHarness with alternatives:
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: org.apache.flink.runtime.operators.testutils.MockEnvironment)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K])org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.StreamOperatorFactory[OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT] <and>
(x$1: org.apache.flink.streaming.api.operators.OneInputStreamOperator[IN,OUT],x$2: org.apache.flink.api.java.functions.KeySelector[IN,K],x$3: org.apache.flink.api.common.typeinfo.TypeInformation[K],x$4: Int,x$5: Int,x$6: Int)org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness[K,IN,OUT]
cannot be applied to (org.apache.flink.streaming.api.operators.KeyedProcessOperator[Int,org.example.TaxiRide,Int], org.example.TaxiRide => Integer, org.apache.flink.api.common.typeinfo.TypeInformation[Integer])
testHarness = new KeyedOneInputStreamOperatorTestHarness(operator, (ride: TaxiRide) => ride.rideId, Types.INT)
好像不能接受Scala类型,为什么会这样?
【问题讨论】:
标签: scala apache-flink