【问题标题】:How to implement a BOUNDED source for Flink's batch execution mode?Flink 的批处理执行模式如何实现 BOUNDED source?
【发布时间】:2022-01-09 16:26:53
【问题描述】:

我正在尝试执行 Flink (1.12.1) 批处理作业,步骤如下:

  • 自定义 SourceFunction 以连接 MongoDB
  • 做任何平面图和地图来转换一些数据
  • 在其他 MongoDB 中下沉

我正在尝试使用 RuntimeExexutionMode.BATCH 在 StreamExecutionEnvironment 中运行它,但应用程序抛出异常,因为检测到我的源为 UNBOUNDED...而且我无法将它设置为 BOUNDED(它必须在收集所有文档后完成在 mongo 集合中)

例外:

    exception in thread "main" java.lang.IllegalStateException: Detected an UNBOUNDED source with the 'execution.runtime-mode' set to 'BATCH'. This combination is not allowed, please set the 'execution.runtime-mode' to STREAMING or AUTOMATIC
        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at org.apache.flink.streaming.api.graph.StreamGraphGenerator.shouldExecuteInBatchMode(StreamGraphGenerator.java:335)
        at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:258)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1958)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1943)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1782)
        at com.grupotsk.bigdata.matadatapmexporter.MetadataPMExporter.main(MetadataPMExporter.java:33)

一些代码:

执行环境

public static StreamExecutionEnvironment getBatch() {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    env.addSource(new MongoSource()).print();
    
    return env;
    
}

蒙古来源:

public class MongoSource extends RichSourceFunction<Document> {

    private static final long serialVersionUID = 8321722349907219802L;
    private MongoClient mongoClient;
    private MongoCollection mc;
    
    
    @Override
    public void open(Configuration con) {
        mongoClient = new MongoClient(
                new MongoClientURI("mongodb://localhost:27017/database"));
        
        mc=mongoClient.getDatabase("database").getCollection("collection");
        
    }
    
    @Override
    public void run(SourceContext<Document> ctx) throws Exception {
        
        MongoCursor<Document> itr=mc.find(Document.class).cursor();
        while(itr.hasNext())
            ctx.collect(itr.next());
        this.cancel();
        
    }

    @Override
    public void cancel() {
        mongoClient.close();
        
    }

谢谢!

【问题讨论】:

    标签: java mongodb apache-flink


    【解决方案1】:

    RuntimeExecutionMode.BATCH 一起使用的源必须实现Source 而不是SourceFunction。并且接收器应该实现Sink 而不是SinkFunction

    有关这些新接口的介绍,请参阅Integrating Flink into your ecosystem - How to build a Flink connector from scratch 。它们在FLIP-27: Refactor Source InterfaceFLIP-143: Unified Sink API 中有描述。

    【讨论】:

    • 谢谢!!!!!!
    猜你喜欢
    • 1970-01-01
    • 2020-04-14
    • 2017-02-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-15
    • 2018-11-07
    • 1970-01-01
    相关资源
    最近更新 更多