TL;DR:在 IDE 中本地运行时,Flink 使用带有 LocalEnvironment 或 TestEnvironment(用于单元测试)的 MiniCluster。不幸的是,即使设置了这些变量,它们似乎也不会使用本地环境变量,否则无法配置它们。
我遇到了同样的问题,想在我的 IDE(VS 代码)中运行 Flink 并安装插件。我使用 (scala) sys.env.get(ConfigConstants.ENV_FLINK_PLUGINS_DIR) 来验证环境变量是否设置正确。此外,我咨询了 unit test in Flink's source 并发现了两件事:首先,尽管设置了环境变量,但 PluginConfig 中的值仍然是默认值。其次,像在单元测试中所做的那样覆盖该值并不能防止由于找不到插件而导致的错误,但它确实会更改 PluginConfig.getPluginsDir 的返回值。这可能是因为必须在启动时设置插件目录才能找到和加载插件。
查看Flink documenation 好像可以配置环境,但是我的尝试没有成功。
val conf: Configuration = new Configuration();
conf.setString(ConfigConstants.ENV_FLINK_PLUGINS_DIR, "C:/Users/ivwebber/Source/MDPLocal/private/MapsAI/projects/TrafficInference/Modules/traffic-forecast-pipeline/plugins/");
val env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
我认为这种方法的问题在于它不是可以在conf/flink-conf.yaml 中设置的值。我查看了in the source,发现该值从未被复制到结果配置中。参见here too。
这个question可能是相关的。
这个question可能是相关的。
我可能花了比研究它更多的时间,所以我认为提交功能请求/错误将是下一步。考虑到标记“ENV_FLINK_PLUGINS_DIR”在源代码中只出现了 4 次;我真的想知道它是否从未阅读或使用过。