【发布时间】:2021-02-04 08:00:35
【问题描述】:
我正在尝试在 Flink Runner 上运行 Python BEAM 作业。我在本地机器上启动了一个 minikube 集群,端口转发 8081 以查看 localhost:8081 上的 Flink Dashboard。已部署的 flink 集群上的所有内容似乎都在运行,但是当我尝试执行基本管道(在 DirectRunner 上运行良好)时,我在 Flink 已完成作业仪表板上收到此堆栈跟踪错误:
Job failed during initialization of JobManager
org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
我的光束管道是这样的
from apache_beam.options.pipeline_options import PipelineOptions
def run():
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=localhost:8081",
# "--environment_type=LOOPBACK"
])
with beam.Pipeline(options=options) as p:
lines = (
p
| 'create' >> beam.Create([
'This is a test',
'This is another line',
'Oh look another'
])
| beam.Map(print)
)
if __name__ == "__main__":
run()
似乎找不到解决此问题的方法。 Flink 版本为 v1.12.0。
【问题讨论】:
标签: python apache-flink apache-beam