【问题标题】:Reading latest data from Kafka broker in Apache Flink从 Apache Flink 中的 Kafka 代理读取最新数据
【发布时间】:2018-02-01 11:07:57
【问题描述】:

我想从 Kafka 接收最新的数据到 Flink 程序,但是 Flink 正在读取历史数据。 我已将auto.offset.reset 设置为latest 如下所示,但它不起作用

properties.setProperty("auto.offset.reset", "latest");

Flink Programm 正在使用以下代码从 Kafka 接收数据

//getting stream from Kafka and giving it assignTimestampsAndWatermarks

        DataStream<JoinedStreamEvent> raw_stream = envrionment.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new JoinSchema(), properties)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

我正在关注关于 https://issues.apache.org/jira/browse/FLINK-4280 ,建议按照下面提到的方式添加源

Properties props = new Properties();
...

FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("topic", schema, props);
kafka.setStartFromEarliest();
kafka.setStartFromLatest();
kafka.setEnableCommitOffsets(boolean); // if true, commits on checkpoint if checkpointing is enabled, otherwise, periodically.
kafka.setForwardMetrics(boolean);
...

env.addSource(kafka)

我也这样做了,但是我无法访问setStartFromLatest()

 FlinkKafkaConsumer09 kafka = new FlinkKafkaConsumer09<JoinedStreamEvent>( "test", new JoinSchema(),properties);

我应该怎么做才能接收发送到的最新值 Kafka 而不是从历史中接收价值?

【问题讨论】:

    标签: apache-kafka apache-flink kafka-producer-api flink-streaming flink-cep


    【解决方案1】:

    通过为发送者和消费者创建名为 test1 的新 group id 并保持主题名称与 test 相同,问题得到解决。

    现在我想知道,这是解决此问题的最佳方法吗?因为 每次我需要提供一个新的组 id 时

    有什么方法可以读取发送到 Kafka 的数据吗?

    【讨论】:

      【解决方案2】:

      我相信这对你有用。它对我有用。修改属性和你的 kafka 主题。

      public static void main(String[] args) throws Exception {
      
      
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", "ip:port");
              properties.setProperty("zookeeper.connect", "ip:port");
              properties.setProperty("group.id", "your-group-id");
      
      
              DataStream<String> stream = env
                      .addSource(new FlinkKafkaConsumer09<>("your-topic", new SimpleStringSchema(), properties));
      
      
      
              stream.writeAsText("your-path", FileSystem.WriteMode.OVERWRITE)
                  .setParallelism(1);
      
              env.execute();
      
          }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-11-08
        • 2017-05-06
        • 2018-05-13
        • 2018-05-09
        • 2017-03-09
        • 2019-12-20
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多