【问题标题】:how to buffer a batch of data in flinkflink中如何缓存一批数据
【发布时间】:2021-11-11 01:01:31
【问题描述】:

我想在 flink 中缓冲一个数据流。我最初的想法是将 100 条数据缓存到一个列表或元组中,然后使用insert into values (???) 将数据批量插入到 clickhouse。你有更好的方法吗?

【问题讨论】:

  • 缓冲区数据流到底是什么意思?您是否想在一个窗口中收集数据(基于时间或事件数量)并刷新事件而不聚合它们?
  • 正是...在窗口中收集然后刷新它们

标签: apache-flink clickhouse data-stream


【解决方案1】:

如果要批量导入数据到数据库,可以使用窗口(countWindow或timeWindow)来聚合数据。

【讨论】:

    【解决方案2】:

    感谢所有答案。我使用窗口函数来解决这个问题。

    SingleOutputStreamOperator<ArrayList<User>> stream2 = 
         stream1.countWindowAll(batchSize).process(new MyProcessWindowFunction());
    

    然后我覆盖处理函数,其中数据的批量大小缓冲在ArrayList中。

    【讨论】:

      【解决方案3】:

      您发布的第一个解决方案有效,但它很不稳定。由于简单的逻辑,它可能导致饥饿。例如,假设您有一个计数器 100 来创建一个批次。您的流可能永远不会收到 100 事件,或者需要几个小时才能收到 100th 事件。然后,您的基本且有效的解决方案可能会将事件卡在窗口批次中,因为它是一个计数窗口。换句话说,您的批次可以在高吞吐量时生成 30 秒的窗口,或者在吞吐量非常低时生成 1 小时的窗口。

      DataStream<User> stream = ...;
      DataStream<Tuple2<User, Long>> stream1 = stream
          .countWindowAll(100)
          .process(new MyProcessWindowFunction());
      

      通常,这取决于您的用例。但是,我会使用 时间窗口 来确保我的工作始终具有刷新批次,即使窗口上的事件很少或没有。

      DataStream<Tuple2<User, Long>> stream1 = stream
          .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30)))
          .process(new MyProcessWindowFunction());;
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-06-23
        • 1970-01-01
        • 2021-04-04
        • 1970-01-01
        • 1970-01-01
        • 2021-11-20
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多