【问题标题】:Flink with Kafka IntegrationFlink 与 Kafka 集成
【发布时间】:2021-05-15 01:48:30
【问题描述】:

我正在尝试将 Flink 与 Kafka 集成并从 Kafka 生产者那里读取数据。我正在尝试按照 flink-docs-release-1.11 文档中的代码运行以下代码

import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

public class Flink_Kafka_Integration {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
    
    
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(myConsumer);
    
    }
}

我收到以下错误,

The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer<String>)
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files

我在我的项目构建路径中包含了一个名为 flink-streaming-java_2.12-1.11.3.jar 的 jar 文件。

任何建议都会有所帮助。

以下是我正在使用的软件版本:

Flink - 1.11.3

斯卡拉 - 2.12

flinkKafkaConsumer-2.12

【问题讨论】:

  • 你应该使用 Maven 或 Gradle 而不是手动添加 jar 文件的地方

标签: java maven apache-kafka apache-flink


【解决方案1】:

您需要在项目构建中包含flink-connector-kafka_2.12-1.11.3.jar

【讨论】:

  • 我也包含了那个 jar 文件。
猜你喜欢
  • 2017-01-23
  • 1970-01-01
  • 2018-05-26
  • 2018-01-08
  • 2018-09-25
  • 1970-01-01
  • 1970-01-01
  • 2020-04-15
  • 2017-06-27
相关资源
最近更新 更多