【发布时间】:2020-02-27 22:00:22
【问题描述】:
以下是按SBT配置
// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.1"
// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.1"
// https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-scala-bridge
libraryDependencies += "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.9.1"
// https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
libraryDependencies += "org.apache.flink" %% "flink-table-planner" % "1.9.1"
// https://mvnrepository.com/artifact/org.apache.flink/flink-table-common
libraryDependencies += "org.apache.flink" % "flink-table-common" % "1.9.1"
// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-base
//libraryDependencies += "org.apache.flink" %% "flink-connector-kafka_2.11" % "1.9.1" -- This not working and throwing unable to connect error.
由于我无法通过 sbt 添加 flink-connector-kafka,所以我下载了 jar 并将其放入 我的 sbt 项目中的 lib(created lib) 文件夹。 sbt 项目是通过 IntelliJ 创建的,只有我手动添加了 lib 文件夹。
现在,当我导入 kafka 连接器包时,即 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 工作正常。
下面是我从 Kafka 消费的代码
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._
object KafkaFlink {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
// properties.setProperty("bootstrap.servers", "localhost:9092")
// // only required for Kafka 0.8
// properties.setProperty("zookeeper.connect", "localhost:2181")
// properties.setProperty("group.id", "test")
val properties1 = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val topic = "flink-fault-testing"
val flinkKafkaConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), properties1)
val value: DataStream[String] = env.addSource(flinkKafkaConsumer)
}
}
我无法编译,因为我收到了错误 无法解析重载方法“addSource”
请提供我做错的地方。
如果有办法直接通过 IntelliJ 的 build.sbt 获取通用 flink-kafka 连接器
【问题讨论】:
标签: apache-flink flink-streaming