【问题标题】:Why is my Flink standalone-cluster not receiving my job?为什么我的 Flink 独立集群没有收到我的工作?
【发布时间】:2019-02-01 15:11:00
【问题描述】:

我在 Flink (Java) 中创建了一个程序来计算 3 个不同房间中 9 个假传感器的平均值。如果我启动 jar 文件,程序运行良好。所以我决定启动 flink 独立集群来检查运行我的 Job 的 TaskManager 和相应的任务,比如这里 (https://ci.apache.org/projects/flink/flink-docs-stable/tutorials/local_setup.html)。我在我的机器上运行所有东西。 为什么我看不到仪表板上正在运行的作业 (http://localhost:8081/#/overview) 但如果我查看日志文件 (tail -f log/flink--client--*-T430.log) 我可以看到正在处理的东西? 此外,print() 方法将输出溢出到控制台。

我用这个命令./bin/flink run examples/explore-flink.jar -c启动我的应用程序

但也许配置文件中有一些我必须配置的参数。这是我的代码:

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.sense.flink.mqtt.MqttTemperature;
import org.sense.flink.mqtt.TemperatureMqttConsumer;

public class SensorsMultipleReadingMqttEdgentQEP {

    private boolean checkpointEnable = true;
    private long checkpointInterval = 1000;
    private CheckpointingMode checkpointMode = CheckpointingMode.EXACTLY_ONCE;

    public SensorsMultipleReadingMqttEdgentQEP() throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        if (checkpointEnable)
            env.enableCheckpointing(checkpointInterval, checkpointMode);

        DataStream<MqttTemperature> temperatureStream01 = env.addSource(new TemperatureMqttConsumer("topic-edgent-01"));
        DataStream<MqttTemperature> temperatureStream02 = env.addSource(new TemperatureMqttConsumer("topic-edgent-02"));
        DataStream<MqttTemperature> temperatureStream03 = env.addSource(new TemperatureMqttConsumer("topic-edgent-03"));
        DataStream<MqttTemperature> temperatureStreams = temperatureStream01.union(temperatureStream02)
                .union(temperatureStream03);

        DataStream<Tuple2<String, Double>> average = temperatureStreams.keyBy(new TemperatureKeySelector())
                .map(new AverageTempMapper());

        average.print();

        String executionPlan = env.getExecutionPlan();
        System.out.println("ExecutionPlan ........................ ");
        System.out.println(executionPlan);
        System.out.println("........................ ");

        // env.execute("SensorsMultipleReadingMqttEdgentQEP");
        env.execute();
    }

    public static class TemperatureKeySelector implements KeySelector<MqttTemperature, Integer> {

        private static final long serialVersionUID = 5905504239899133953L;

        @Override
        public Integer getKey(MqttTemperature value) throws Exception {
            return value.getId();
        }
    }

    public static class AverageTempMapper extends RichMapFunction<MqttTemperature, Tuple2<String, Double>> {

        private static final long serialVersionUID = -5489672634096634902L;
        private MapState<String, Double> averageTemp;

        @Override
        public void open(Configuration parameters) throws Exception {
            averageTemp = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<>("average-temperature", String.class, Double.class));
        }

        @Override
        public Tuple2<String, Double> map(MqttTemperature value) throws Exception {
            String key = "no-room";
            Double temp = value.getTemp();

            if (value.getId().equals(1) || value.getId().equals(2) || value.getId().equals(3)) {
                key = "room-A";
            } else if (value.getId().equals(4) || value.getId().equals(5) || value.getId().equals(6)) {
                key = "room-B";
            } else if (value.getId().equals(7) || value.getId().equals(8) || value.getId().equals(9)) {
                key = "room-C";
            } else {
                System.err.println("Sensor not defined in any room.");
            }
            if (averageTemp.contains(key)) {
                temp = (averageTemp.get(key) + value.getTemp()) / 2;
            } else {
                averageTemp.put(key, temp);
            }
            return new Tuple2<String, Double>(key, temp);
        }
    }
}

谢谢, 费利佩

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    在我选择选项"Extract required libraries into generated JAR" 后,它起作用了。奇怪,因为我使用选项 "Package required libraries into generated JAR" 生成 JAR,但它不起作用。

    【讨论】:

      猜你喜欢
      • 2017-06-12
      • 1970-01-01
      • 1970-01-01
      • 2021-04-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-12-20
      相关资源
      最近更新 更多