【问题标题】:How to change the operators at runtime of a Stream Dataflow program?如何在 Stream Dataflow 程序运行时更改运算符?
【发布时间】:2019-05-21 00:34:48
【问题描述】:

我想知道我是否可以更改已经提交给 Flink 的作业的操作符。假设我有一个字数统计程序,并且上面有一个过滤器,只计算大于 3 个字符的单词。我想在运行时更改此过滤器的参数。我的第一个猜测是 Flink(以及其他数据流引擎 Spark、Storm、Apache Edgent)无法做到这一点,因为该作业已经在 env.execute() 上提交。有谁知道这样做的任何方法?

我猜这个问题 (Deploy stream processing topology on runtime?) 与我想要的有关,但解决方案仍然不是我想要的动态。

谢谢

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Tuple2<String, Integer>> dataStream = env.socketTextStream("localhost", 9000)
        .flatMap(new SplitterFlatMap()).keyBy(0)
        .sum(1)
        .filter(word -> word.f1 >= 3);
dataStream.print();
env.execute("WordCountSocketFilterQEP");

【问题讨论】:

    标签: apache-spark apache-kafka apache-flink apache-storm apache-edgent


    【解决方案1】:

    使用 Flink,您可以将广播流连接到键控流,并以您要使用的参数或代码进行广播。 TaxiQuery 是一个将 Janino 与 Java 表达式结合使用的示例,但您可能可以动态加载一个类。我也看到过使用 Rhino/Javascript、JRuby 等来完成这项工作。

    【讨论】:

    【解决方案2】:

    为了让您的parameterStream 将其值发送给所有操作员,您必须使用BroadcastStream。请注意(从 Flink 1.6 开始?)这还可以让您保持广播状态,您发送到 DynamicFilterCoFlatMapper 的所有实例的“规则”或配置设置将自动保存为状态。

    【讨论】:

      【解决方案3】:

      我想在 Flink 中我可以使用 CoFlatMapFunction -> Flink: How to handle external app configuration changes in flink。但在 Apache Edgent 我不确定是否有办法做到这一点...... 这是我的实现>

      package org.sense.flink.examples.stream;
      
      import org.apache.flink.api.common.functions.MapFunction;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.api.TimeCharacteristic;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
      import org.apache.flink.util.Collector;
      import org.sense.flink.mqtt.FlinkMqttConsumer;
      import org.sense.flink.mqtt.MqttMessage;
      
      public class SensorsDynamicFilterMqttEdgentQEP {
      
          public SensorsDynamicFilterMqttEdgentQEP() throws Exception {
      
              // Start streaming from fake data source sensors
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              // obtain execution environment, run this example in "ingestion time"
              env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
      
              DataStream<MqttMessage> temperatureStream = env.addSource(new FlinkMqttConsumer("topic-edgent"));
              DataStream<Tuple2<Double, Double>> parameterStream = env.addSource(new FlinkMqttConsumer("topic-parameter"))
                      .map(new ParameterMapper());
      
              DataStream<MqttMessage> filteredStream = temperatureStream.connect(parameterStream.broadcast())
                      .flatMap(new DynamicFilterCoFlatMapper());
      
              filteredStream.print();
      
              String executionPlan = env.getExecutionPlan();
              System.out.println("ExecutionPlan ........................ ");
              System.out.println(executionPlan);
              System.out.println("........................ ");
      
              env.execute("SensorsDynamicFilterMqttEdgentQEP");
          }
      
          public static class DynamicFilterCoFlatMapper
                  implements CoFlatMapFunction<MqttMessage, Tuple2<Double, Double>, MqttMessage> {
      
              private static final long serialVersionUID = -8634404029870404558L;
              private Tuple2<Double, Double> range = new Tuple2<Double, Double>(-1000.0, 1000.0);
      
              @Override
              public void flatMap1(MqttMessage value, Collector<MqttMessage> out) throws Exception {
      
                  double payload = Double.parseDouble(value.getPayload());
      
                  if (payload >= this.range.f0 && payload <= this.range.f1) {
                      out.collect(value);
                  }
              }
      
              @Override
              public void flatMap2(Tuple2<Double, Double> value, Collector<MqttMessage> out) throws Exception {
                  this.range = value;
              }
          }
      
          public static class ParameterMapper implements MapFunction<MqttMessage, Tuple2<Double, Double>> {
      
              private static final long serialVersionUID = 7322348505833012711L;
      
              @Override
              public Tuple2<Double, Double> map(MqttMessage value) throws Exception {
                  String[] array = value.getPayload().split(",");
                  double min = Double.parseDouble(array[0]);
                  double max = Double.parseDouble(array[1]);
                  return new Tuple2<Double, Double>(min, max);
              }
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2013-12-14
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-09
        • 2011-04-13
        相关资源
        最近更新 更多