【问题标题】:Read from Kafka into Flink Scala Shell从 Kafka 读取到 Flink Scala Shell
【发布时间】:2019-08-01 13:13:07
【问题描述】:

我正在尝试在 Flink (1.7.2) 附带的 scala-shell 中连接到本地计算机上的 Kafka (2.1) 并从中读取数据。

这就是我正在做的事情:

:require flink-connector-kafka_2.11-1.7.1.jar
:require flink-connector-kafka-base_2.11-1.7.1.jar

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
import java.util.Properties

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

在最后一条语句之后,我收到以下错误:

scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()
<console>:69: error: overloaded method value addSource with alternatives:
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext[T] => Unit)(implicit evidence$10: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T] <and>
  [T](function: org.apache.flink.streaming.api.functions.source.SourceFunction[T])(implicit evidence$9: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.streaming.api.scala.DataStream[T]
 cannot be applied to (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer[String])
   var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print()

我创建了名为“topic”的主题,并且能够通过另一个客户端正确地生成和读取消息。我正在使用 java 版本 1.8.0_201 并按照 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html 的说明进行操作。

对可能出现的问题有任何帮助吗?

【问题讨论】:

    标签: scala apache-kafka apache-flink scala-shell


    【解决方案1】:

    某些依赖项隐含地需要其他依赖项。我们通常使用一些依赖管理器,例如 mavensbt,当我们在项目中添加一些依赖时,依赖管理器会在后台提供其隐式依赖。

    另一方面,当您使用没有依赖项管理器的 shell 时,您有责任提供代码依赖项。使用 Flink Kafka 连接器明确需要 Flink Connector Kafka jar,但您应该注意到 Flink Connector Kafka 也需要一些依赖项。您可以在page 底部找到它的依赖项,它位于Compile Dependencies 部分中。所以从这个前言开始,我在FLINK_HOME/lib(Flink classpath)目录下添加了如下jar文件:

    flink-connector-kafka-0.11_2.11-1.4.2.jar
    flink-connector-kafka-0.10_2.11-1.4.2.jar    
    flink-connector-kafka-0.9_2.11-1.4.2.jar   
    flink-connector-kafka-base_2.11-1.4.2.jar  
    flink-core-1.4.2.jar                                         
    kafka_2.11-2.1.1.jar
    kafka-clients-2.1.0.jar
    

    我可以在 Flink shell 中使用以下代码成功使用 Kafka 消息:

    scala> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    
    scala> import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    
    scala> import java.util.Properties
    import java.util.Properties
    
    scala> val properties = new Properties()
    properties: java.util.Properties = {}
    
    scala> properties.setProperty("bootstrap.servers", "localhost:9092")
    res0: Object = null
    
    scala> properties.setProperty("group.id", "test")
    res1: Object = null
    
    scala> val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()
    warning: there was one deprecation warning; re-run with -deprecation for details
    stream: org.apache.flink.streaming.api.datastream.DataStreamSink[String] = org.apache.flink.streaming.api.datastream.DataStreamSink@71de1091
    
    scala> senv.execute("Kafka Consumer Test")
    Submitting job with JobID: 23e3bb3466d914a2747ae5fed293a076. Waiting for job completion.
    Connected to JobManager at Actor[akka.tcp://flink@localhost:40093/user/jobmanager#1760995711] with leader session id 00000000-0000-0000-0000-000000000000.
    03/11/2019 21:42:39 Job execution switched to status RUNNING.
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to SCHEDULED 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to DEPLOYING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
    03/11/2019 21:42:39 Source: Custom Source -> Sink: Unnamed(1/1) switched to RUNNING 
    hello
    hello
    

    另外,将一些 jar 文件添加到 Flink 类路径的另一种方法是将 jar 文件作为 Flink shell 启动命令的参数传递:

    bin/start-scala-shell.sh local "--addclasspath <path/to/jar.jar>"
    

    测试环境:

    Flink 1.4.2
    Kafka 2.1.0
    Java  1.8 201
    Scala 2.11
    

    【讨论】:

    • 哇。非常感谢您的解释和示例。我会运行它并检查。我认为所有依赖项都得到了满足,因为没有任何丢失的类或依赖项错误。很有帮助!
    • 一开始,我遇到了和你一样的错误。只需下载我列出的所有依赖项并将它们放在 FLINK_HOME/lib 中。我希望这会有所帮助。
    • @div93 由于我使用的是 Flink 1.4.2,所以我得到了它的相应依赖项。您应该获得相应版本 1.7.2 的依赖项。
    【解决方案2】:

    您很可能应该在添加源之前导入 Flink 的 Scala 隐式:

    import org.apache.flink.streaming.api.scala._
    

    【讨论】:

    • 我可以看到的另一件事是版本不匹配(kafka 连接器中的 1.7.2 与 1.7.1),也许统一这会有所帮助?
    • 我用 v1.7.1 测试过。它没有解决这个错误,而是引入了一个新错误:scala> var stream = senv.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)).print() error: error while loading函数,类文件 '/Users/path/flink-1.7.1/lib/flink-dist_2.11-1.7.1.jar(org/apache/flink/api/common/functions/Function.class)' 没有位置匹配其内容:包含类函数 :69: 错误:重载方法值 addSource 与替代:
    猜你喜欢
    • 2016-11-08
    • 2017-05-06
    • 2017-03-09
    • 1970-01-01
    • 1970-01-01
    • 2018-01-31
    • 1970-01-01
    • 2018-02-01
    • 2018-05-09
    相关资源
    最近更新 更多