【问题标题】:How to configure flink jobs at runtime?如何在运行时配置 flink 作业?
【发布时间】:2017-06-28 12:56:36
【问题描述】:

是否可以在运行时配置 flink 应用程序?例如,我有一个流式应用程序,它读取输入,进行一些转换,然后过滤掉低于某个阈值的所有元素。但是,我希望这个阈值可以在运行时进行配置,这意味着我可以更改它而无需重新启动我的 flink 作业。示例代码:

DataStream<MyModel> myModelDataStream = // get input ...
  // do some stuff ...
  .filter(new RichFilterFunction<MyModel>() {
    @Override
    public boolean filter(MyModel value) throws Exception {
      return value.someValue() > someGlobalState.getThreshold();
    }
  })
  
// write to some sink ...
DataStream<MyConfig> myConfigDataStream = // get input ...
  // ...
  .process(new RichProcessFunction<MyConfig>() {
    someGlobalState.setThreshold(MyConfig.getThreshold());
  })

// ...

是否有可能实现这一目标?例如可以通过配置流更改的全局状态。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    是的,您可以使用BroadcastProcessFunction 来完成此操作。大致是这样的:

    MapStateDescriptor<Void, Threshold> bcStateDescriptor = 
      new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class);
    
    DataStream<MyModel> myModelDataStream = // get input ...
    DataStream<Threshold> thresholds = // get input...
    BroadcastStream<Threshold> controlStream = thresholds.broadcast(bcStateDescriptor);
    
    DataStream<MyModel> result = myModelDataStream
      .connect(controlStream)
      .process(new MyFunction());
    
    public class MyFunction extends BroadcastProcessFunction<MyModel, Long, MyModel> {    
      @Override
      public void processBroadcastElement(Threshold newthreshold, Context ctx, Collector<MyModel> out) {
        MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class)
        BroadcastState<Void, Threshold> bcState = ctx.getBroadcastState(stateDescriptor);  
        bcState.put(null, newthreshold);
      }
    
      @Override
      public void processElement(MyModel model, Collector<MyModel> out) {
        Threshold threshold = ctx.getBroadcastState(new MapStateDescriptor<>("threshold", Types.VOID, Threshold.class)).get(null);
        if (threshold.value() == null || model.getData() > threshold.value()) {
          out.collect(model);
        }
      }
    }
    

    【讨论】:

    • 根据您所说,我可以在我的 RuntimeContext 上放置一些变量并使用丰富的函数来访问它们吗?例如,在我的情况下,我有一个带有一些配置的类。
    • 请提出一个新问题,因为这听起来不相关,而且太复杂,无法在评论中回答。
    猜你喜欢
    • 2023-04-02
    • 1970-01-01
    • 2021-02-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多