【发布时间】:2021-05-15 01:48:30
【问题描述】:
我正在尝试将 Flink 与 Kafka 集成并从 Kafka 生产者那里读取数据。我正在尝试按照 flink-docs-release-1.11 文档中的代码运行以下代码
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
public class Flink_Kafka_Integration {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(myConsumer);
}
}
我收到以下错误,
The method addSource(SourceFunction<OUT>) in the type StreamExecutionEnvironment is not applicable for the arguments (FlinkKafkaConsumer<String>)
The type org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase cannot be resolved. It is indirectly referenced from required .class files
我在我的项目构建路径中包含了一个名为 flink-streaming-java_2.12-1.11.3.jar 的 jar 文件。
任何建议都会有所帮助。
以下是我正在使用的软件版本:
Flink - 1.11.3
斯卡拉 - 2.12
flinkKafkaConsumer-2.12
【问题讨论】:
-
你应该使用 Maven 或 Gradle 而不是手动添加 jar 文件的地方
标签: java maven apache-kafka apache-flink