【问题标题】:Reading data into Flink Program from Kafka console producer从 Kafka 控制台生产者读取数据到 Flink 程序
【发布时间】:2018-01-31 04:34:04
【问题描述】:

我创建了一个名为test 的主题,并使用控制台生产者在控制台中编写了一些字符串。

./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092

幸运的是,我可以使用console-consumer 读取控制台中生成的数据。现在,我想使用下面的代码在 Flink 程序中使用 console-consumer 的输出

public class ReadFromKafka {


    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");


        DataStream<String> message = env.addSource(new FlinkKafkaConsumer08<String>("test", new SimpleStringSchema(),properties));

        message.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return " Value: " + value;
            }
        }).print();

        env.execute();


    } //main
} //ReadFromKafka

POM.XML 内容如下

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>org.stsffap</groupId>
    <artifactId>cep-monitoring</artifactId>
    <name>cep-monitoring</name>
    <version>1.0</version>

    <packaging>jar</packaging>

    <properties>
        <flink.version>1.0.1</flink.version>

        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>
<dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.3.2</version>
    </dependency>

        <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.3.2</version>
    </dependency>


<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.10</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
            <version>1.3.2</version>

        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>

                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerId>jdt</compilerId>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.eclipse.tycho</groupId>
                        <artifactId>tycho-compiler-jdt</artifactId>
                        <version>0.21.0</version>
                    </dependency>
                </dependencies>
            </plugin>




        </plugins>
    </build>
</project>

每当我执行此代码时,都会出现以下错误

objc[892]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/bin/java (0x109f654c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10afd44e0). One of the two will be used. Which one is undefined.
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedFunction
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.stsffap.cep.monitoring.ReadFromKafka.main(ReadFromKafka.java:24)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 25 more

也是 Kafka 的版本,我正在使用通过以下命令找到的

find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'

kafka_2.11-0.9.0.0-javadoc.jar

我需要使用 .8.x 版本的 Kafka 来运行我的示例吗?

非常感谢您的评论和建议。提前致谢。 祝你好运!

【问题讨论】:

    标签: maven apache-kafka apache-flink flink-streaming apache-kafka-connect


    【解决方案1】:

    我的程序通过进行以下更改开始工作,我将 Kafka 版本更新为 .9.x

    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
                <version>${flink.version}</version>
    
            </dependency>
    

    我将 Flink 版本从 1.0.1 升级到 1.1.2 ,如下图

    <properties>
            <!-- <flink.version>1.0.1</flink.version>-->
    
            <flink.version>1.1.2</flink.version>
            <slf4j.version>1.7.7</slf4j.version>
            <log4j.version>1.2.17</log4j.version>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
    
            <dependency>
    
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>${flink.version}</version>
        </dependency>
    

    【讨论】:

    • 如果这解决了你的问题,你能把它标记为解决答案吗?
    猜你喜欢
    • 2016-05-24
    • 2016-11-08
    • 2017-02-28
    • 2016-07-19
    • 2018-06-16
    • 2019-09-24
    • 2016-06-17
    • 2021-01-09
    • 2021-12-16
    相关资源
    最近更新 更多