【问题标题】:Write data from custom source to flink in continuous way以连续方式将数据从自定义源写入 flink
【发布时间】:2017-07-31 06:48:36
【问题描述】:

这是我第一次使用 Apache Flink (1.3.1) 并有一个问题。更详细地说,我正在使用 flink-core、flink-cep 和 flink-streaming 库。我的应用程序是一个 Akka ActorSystem,它使用来自 RabbitMQ 的消息,并且各种参与者处理这些消息。在某些演员中,我想从 Flink 实例化一个 StreamExecutionEnvironment 并处理传入的消息。因此,我编写了一个自定义源类,扩展了RichSourceFunction 类。一切正常,除了一件事:我不知道如何将数据发送到我的 Flink 扩展。这是我的设置:

public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

这是我的自定义源函数:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

所以我想从外部调用我的FlinkExtension 类中的方法sendData 以连续的方式将数据写入我的FlinkExtension。这是我的 JUnit 测试应该将数据发送到扩展,然后将数据写入SourceContext

@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

但是如果我运行测试,没有任何反应,应用程序挂在CustomSourceFunction 的运行方法中。我还尝试在CustomSourceFunction run 方法中创建一个新的无限线程。

总结一下:有人知道如何以连续的方式将数据从应用程序写入 Flink 实例吗?

【问题讨论】:

    标签: java apache-flink flink-streaming flink-cep


    【解决方案1】:

    Flink 源连接器通过在 while(run) 循环内调用其 run() 方法(或 collectWithTimestamp())来发出连续的数据流。如果你想研究一个例子,Apache NiFi 源代码并不像大多数人那么复杂; here's its run method.

    【讨论】:

    • 与他的问题略有不同。看起来他想从另一个对象调用发送数据方法。但是 NiFi 示例是在源内部创建事务。从该客户端接收消息。方法调用比在对象实例之间创建连接/事务要容易得多。那么,有没有办法在没有事务的情况下向源添加消息?
    【解决方案2】:

    问题在于CustomSourceFunction 方法和sendData 方法使用了不同的对象实例。因此,context 对象不会在方法之间共享,并且添加新的 ValueEvent 不起作用。

    要解决此问题,请将run 方法使用的对象实例存储为CustomSourceFunction 类的静态成员变量。当您需要创建新的ValueEvent 时,请在同一对象实例上调用sendData 方法。

    见下面的示例代码

    package RuleSources;
    
    import Rules.Rule;
    import org.apache.flink.streaming.api.watermark.Watermark;
    
    import java.util.ArrayList;
    
    public class DynamicRuleSource extends AlertingRuleSource {
        private static DynamicRuleSource sourceObj;
    
        private SourceContext<Rule> ctx;
    
        public static DynamicRuleSource getSourceObject() {
            return sourceObj;
        }
    
        public void run(SourceContext<Rule> ctx) throws Exception {
            this.ctx = ctx;
            sourceObj = this;
            while(true) {
                Thread.sleep(100);
            }
        }
    
        public void addRule(Rule rule) {
            ctx.collect(rule);
        }
    
        @Override
        public void cancel() {
        }
    }
    

    添加新规则

     public static void addRule(Rule rule) throws Exception {
            AlertingRuleSource sourceObject = DynamicRuleSource.getSourceObject();
            sourceObject.addRule(rule);
        }
    

    【讨论】:

      猜你喜欢
      • 2018-06-11
      • 1970-01-01
      • 2020-11-26
      • 2019-08-19
      • 2022-01-19
      • 2021-10-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多