【问题标题】:Is it possible to capture output of a flink datastream to a list是否可以将 flink 数据流的输出捕获到列表中
【发布时间】:2020-10-09 15:55:23
【问题描述】:

我是 flink 的新手,不知道这是正确的方法还是愚蠢的事情我有一个字符串数据类型的数据流,我正在尝试将数据流中的数据捕获到一个列表中,我正在尝试类似下面的方法

public class DataCapture {

    public static List<String> stringList(DataStream<String> dataStream) {

        List<String> myOutputlist = new ArrayList<>();

        dataStream.flatMap(new FlatMapFunction<String, List<String>>() {
            @Override
            public void flatMap(String value, Collector<List<String>> out) throws Exception {
                System.out.println("==================DATASTREAM-VALUE=====================" +value);
                myOutputlist.add(value);
                out.collect(myOutputlist);
            }
        });

        return myOutputlist;
    }

}

有没有办法我可以把它放到一个列表中,我什至尝试添加一个接收器并尝试将输出捕获到同样不起作用的列表事件中

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    不确定生产代码,但在几个测试用例中,我使用了 CollectSink 包裹 List,类似于 this one

    // a testing sink
    class CollectSink implements SinkFunction<String> {
    
        // must be static
        public static final List<String> values = new CopyOnWriteArrayList<>();
    
        @Override
        public synchronized void invoke(String value) throws Exception {
            values.add(value);
        }
    }
    

    此接收器将收集value 列表中的元素。 您只需将此接收器添加到管道中。

    更新:正如@kkrugler 指出的那样,将ArrayList 替换为线程安全的CopyOnWriteArrayList List 实现,以便能够安全地使用并行度大于1 的接收器。

    【讨论】:

    • 请注意,如果接收器的并行度 > 1,这可能会出现问题,因为您将可以同时访问 ArrayList。我通常推荐使用ConcurrentLinkedQueue,它是线程安全的。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多