【发布时间】:2021-12-17 15:38:46
【问题描述】:
我正在编写一个 Java 项目,以使用 Apache Flink API 在 Dockerized MySQL 数据库中创建一个表。根据文档提供的内容,我编写了以下代码行来定义流环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //<== runtime error
在输入 docker-compose build 和 docker-compose up 后,脚本会在几秒钟后挂起,原因如下:
线程“main”org.apache.flink.table.api.TableException 中的异常:无法实例化执行程序。确保规划器模块位于类路径中
在几十行之后,运行时错误主要由以下因素驱动:
app_1 |原因:org.apache.flink.table.api.ValidationException:在类路径中找不到任何实现 'org.apache.flink.table.delegation.ExecutorFactory' 的工厂。
好像程序无法链接正确的依赖关系或类似的东西。我使用的 pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mycompany</groupId>
<artifactId>first-stream</artifactId>
<version>maven-project</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.27</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
<mainClass>com.example.first_stream.WindowWordCount</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
而Java代码如下:
package com.example.first_stream;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.api.java.typeutils.TypeExtractor;
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.execute();
}
}
经过多次尝试,我没有从文档中确定崩溃的原因,也没有仔细检查 Maven pom.xml。关于如何处理这种情况的任何想法?谢谢。
【问题讨论】:
-
你能和我们分享你的 pom.xml。您使用的是哪个 Flink 版本?规划器是 dist 的一部分,但也可以包含在项目本身中。在任何情况下,版本都需要匹配。
-
我正在使用 Flink 1.14.0,并且在 pom.xml
<dependencies>正文中定义的规划器与该版本匹配。我只是在上面添加了 pom.xml 文件。 -
可能是因为某些 Flink 依赖项应该有 provided 范围?
-
我也认为提供的范围应该足够了。如果没有,请确保在 JAR 文件中填写
META-INF/services/org.apache.flink.table.factories.Factory,将计划程序注册为 Java Service Loader 服务。存在用于合并服务文件的 Maven 转换器插件。 -
我也尝试设置我的 Flink 依赖项provided,但是在设置 Stream Execution Environment 时,甚至在定义之前,由于程序在第一行挂起,情况恶化了流表:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment。 @twalthr,我很抱歉,但我没有明白这一点。我是否应该在 pom.xml 中导入/修改某些内容以执行您的建议?
标签: java docker apache-flink flink-streaming