【问题标题】:Flink Kafka Producer elements out of orderFlink Kafka Producer 元素乱序
【发布时间】:2018-11-07 10:46:35
【问题描述】:

我正在尝试使用 FlinkKafkaProducer010 生成元素,但是当我打开消费者控制台窗口时,元素似乎无序到达。

我使用以下方法创建了主题: kafka-topics.bat --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

消费者是使用以下方法创建的:kafka-console-consumer.bat --zookeeper localhost:2181 --topic mytopic

我使用的 Kafka Producer 代码是:

public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if(parameterTool.getNumberOfParameters() < 2) {
        System.out.println("Missing parameters!");
        System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
        return;
    }

    StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

    DataStream<String> messageStream = env.addSource(getSourceFunction());

    FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
    messageStream.addSink(producer);
    env.execute("Kafka Producer");
}

public static SourceFunction<String> getSourceFunction() {
    return new SourceFunction<String>() {
        private static final long serialVersionUID = 6369260225318862378L;
        public boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {
            int counter = 0;
            while (this.running && counter < 500) {
                String data = "item " + Integer.toString(counter);
                ctx.collect(data);

                counter++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    };
}

当我查看 Kafka 日志文件时,我看到一个 .log 文件,其中的元素也乱序。元素的排序使大约 10 个值的跳跃。在我的用例中,必须有正确的顺序。我一直在寻找如何确保元素按顺序到达,但到目前为止没有任何运气。 有什么我错过的东西可以修复排序吗?

提前感谢您的帮助!

【问题讨论】:

  • 您已保证在分区中排序。你能以编程方式创建消费者并检查吗?
  • 我已经在使用 FlinkKafkaConsumer010 并将 TimeCharacteristic 设置为 EventTime 来使用 Flink 中的元素。然而,这也有乱序到达,所以我回到了检查的基础,只检查 Kafka 和一个简单的 Kafka 控制台消费者。

标签: apache-kafka apache-flink kafka-producer-api


【解决方案1】:

我猜你正在使用并行度 > 1 作为接收器。元素的顺序仅在单个运算符实例中得到保证。如果您从接收器的多个并行实例写入单个 kafka 分区,则无法保证顺序。

【讨论】:

    猜你喜欢
    • 2021-11-08
    • 2019-12-10
    • 2021-03-22
    • 2019-02-16
    • 2019-07-18
    • 1970-01-01
    • 2017-03-22
    • 1970-01-01
    • 2019-06-13
    相关资源
    最近更新 更多