【问题标题】:Job failed during initialization of JobManager from Flink Dashboard从 Flink Dashboard 初始化 JobManager 期间作业失败
【发布时间】:2021-02-04 08:00:35
【问题描述】:

我正在尝试在 Flink Runner 上运行 Python BEAM 作业。我在本地机器上启动了一个 minikube 集群,端口转发 8081 以查看 localhost:8081 上的 Flink Dashboard。已部署的 flink 集群上的所有内容似乎都在运行,但是当我尝试执行基本管道(在 DirectRunner 上运行良好)时,我在 Flink 已完成作业仪表板上收到此堆栈跟踪错误:

Job failed during initialization of JobManager
org.apache.flink.runtime.client.JobInitializationException: Could not instantiate JobManager.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException

我的光束管道是这样的

    from apache_beam.options.pipeline_options import PipelineOptions
    
    def run():
        options = PipelineOptions([
            "--runner=FlinkRunner",
            "--flink_master=localhost:8081",
            # "--environment_type=LOOPBACK"
        ])
    
        with beam.Pipeline(options=options) as p:
            lines = (
                p
                | 'create' >> beam.Create([
                    'This is a test',
                    'This is another line',
                    'Oh look another'
                ])
                | beam.Map(print)
            )
    
    if __name__ == "__main__":
        run()

似乎找不到解决此问题的方法。 Flink 版本为 v1.12.0。

【问题讨论】:

    标签: python apache-flink apache-beam


    【解决方案1】:

    我解决了这个问题,我没有具有所需 python3.7 SDK 的工作池

    - name: beam-worker-pool
        image: apache/beam_python3.7_sdk
        args: ["--worker_pool"]
        ports:
        - containerPort: 50000
          name: pool
        livenessProbe:
          tcpSocket:
            port: 50000
          initialDelaySeconds: 30
          periodSeconds: 60
    

    这个 sidecar 需要添加到任务管理器 kubernetes deploy 中。管道选项必须更改为

    options = PipelineOptions([
            "--runner=FlinkRunner",
            "--flink_version=1.10",
            "--flink_master=localhost:8081",
            "--environment_type=EXTERNAL",
            "--environment_config=localhost:50000"
        ])
    
    

    使用 BEAM 的 SDK 工具中的 --environment_config 可以解决问题。我尝试了 Flink 1.12,但没有成功,我一直保持在 1.10。

    【讨论】:

    • 如果Flink客户端和服务器不匹配,使用Java SDK也会出现这种情况。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-14
    • 1970-01-01
    相关资源
    最近更新 更多