【问题标题】:How to use the Tumbling window function for the non keyed streaming data in Flink?Flink 中非 keyed 流数据如何使用 Tumbling 窗口功能?
【发布时间】:2021-08-19 16:00:53
【问题描述】:

我想为我的程序(非键控数据)使用翻转窗口功能,因为它正在处理流数据,但只有 300 条消息/秒。我想把它提高到至少 5K/秒。为此,我想使用翻滚窗口 2 秒来加快其性能。但我不确定如何在我的情况下使用它。

注意:我使用 Geomesa HBase 平台来保存消息。 另外,我没有在这里粘贴我的整个应用程序代码,因为我只需要窗口函数,这里的代码足以让您理解

这是我的 flink 代码

public class Tranport {

    public static void main(String[] args) throws Exception {

        // fetch runtime arguments
        String bootstrapServers = "xx.xx.xxx.xxx:xxxx";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set up the Consumer and create a datastream from this source
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);
        properties.setProperty("group.id", "group_id");
        final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>("lc", new SimpleStringSchema(), properties);
        flinkConsumer.setStartFromTimestamp(Long.parseLong("0"));

        DataStream<String> readingStream = env.addSource(flinkConsumer);
        readingStream.rebalance().map(new RichMapFunction<String, String>() {

            private static final long serialVersionUID = -2547861355L; // random number

            DataStore lc_live = null;
            
            SimpleFeatureType sft_live;
            SimpleFeatureBuilder SFbuilderLive; // feature builder for live

            List<SimpleFeature> lc_live_features; // 

            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("In open method.");

                // --- GEOMESA, GEOTOOLS APPROACH ---//
                // define connection parameters to xxx GeoMesa-HBase DataStore
                Map<String, Serializable> params_live = new HashMap<>();
                params_live.put("xxxx", "xxx"); // HBase table name
                params_live.put("xxxx","xxxx");

                try {
                    lc_live = DataStoreFinder.getDataStore(params_live);
                    if (lc_live == null) {
                        System.out.println("Could not connect to live");
                    } else {
                        System.out.println("Successfully connected to live");
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

                // create simple feature type for x table in HBASE 
                StringBuilder attributes1 = new StringBuilder();
                attributes1.append("xxx:String,");
                attributes1.append("xxx:Long,");
                attributes1.append("source:String,");
                attributes1.append("xxx:String,");
                attributes1.append("xxx:Double,");
                attributes1.append("status:String,");
                attributes1.append("forecast:Double,");
                attributes1.append("carsCount:Integer,");
                attributes1.append("*xxx:Point:srid=4326");
                sft_history = SimpleFeatureTypes.createType("xxxx", attributes1.toString());

                try {
                    lc_history.createSchema(sft_history);                                               

                } catch (IOException e) {
                    e.printStackTrace();
                }

                // Initialize the variables
                numberOfMessagesProcessed = 0;
                numberOfMessagesFailed = 0;
                numberOfMessagesSkipped = 0;

        // for lc_Live
                lc_live_features = new ArrayList<>();
                SFbuilderLive = new SimpleFeatureBuilder(sft_live);

这里我想创建一个翻滚窗口函数(Window All),它可以把所有的 在 2 秒的窗口内流式传输消息并将它们推送到我在下面创建的数组列表中

        
                        // live GeoMesa-HBase DataStore
                        // copy the list into a local variable and empty the list for the next iteration
                        List<SimpleFeature> LocalFeatures = live_features;
                        live_features = new ArrayList<>();
                        LocalFeatures = Collections.unmodifiableList(LocalFeatures);
                        try (FeatureWriter<SimpleFeatureType, SimpleFeature> writer = live.getFeatureWriterAppend(sft_live.getTypeName(), Transaction.AUTO_COMMIT)) {
                            System.out.println("Writing " + LocalFeatures.size() + " features to live");
                            for (SimpleFeature feature : LocalFeatures) {
                                SimpleFeature toWrite = writer.next();
                                toWrite.setAttributes(feature.getAttributes());
                                ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
                                toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
                                toWrite.getUserData().putAll(feature.getUserData());
                                writer.write();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

【问题讨论】:

  • 听起来您希望通过使用 windows 来提高性能。这是非常不可能的。我建议您使用分析器来查看时间的去向。
  • @DavidAnderson 是的,我想提高它的性能,我这样做是因为这个应用程序正在处理和存储大量的流数据,对于每条消息,它都会重复整个过程,这就是为什么它需要时间.我想每 2 秒将消息推送到一个数组列表中,这将一次处理所有消息。因此将提高性能
  • 您可以在文档中找到教程:ci.apache.org/projects/flink/flink-docs-release-1.13/docs/…,并在训练练习中找到示例:github.com/apache/flink-training/tree/master/hourly-tips

标签: java apache-kafka apache-flink windowing geomesa


【解决方案1】:

已经很晚了,但可能会对某人有所帮助。在 Scala 中,您可以执行类似的操作

 env.addSource(consumer).
      windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)))  

但是,请记住,如果您没有使用KeyBy(),那么无论您在env.setParallelism() 中设置什么值,您的数据都不会被并行处理

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多