【问题标题】:Kafka producer with default partitioning具有默认分区的 Kafka 生产者
【发布时间】:2019-02-03 00:32:45
【问题描述】:

现在我的 kafka 生产者正在将所有消息下沉到一个 kafka 主题的单个分区,该分区实际上有多个分区。

我如何创建一个生产者来使用默认分区器并在主题的不同分区之间分发消息。

我的kafka生产者的代码sn-p:

Properties props = new Properties();
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrap.servers);
props.put(ProducerConfig.ACKS_CONFIG, "all");

我正在使用 flink kafka producer 来接收关于 kafka 主题的消息。

speStream.addSink(
    new FlinkKafkaProducer011(kafkaTopicName,
    new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), 
    props, 
    FlinkKafkaProducer011.Semantic.EXACTLY_ONCE)

【问题讨论】:

    标签: apache-kafka apache-flink


    【解决方案1】:

    使用默认分区器,消息会使用以下逻辑分配一个分区:

    • 键控消息:生成键的散列并基于该散列选择分区。这意味着具有相同密钥的消息将最终在同一个分区上

    • 未加密的消息:轮询用于分配分区

    解释您看到的行为的一个选项是,如果您对所有消息使用相同的密钥,那么使用默认分区程序,它们最终将位于同一个分区上。

    【讨论】:

    • 我不确定这里的关键部分。我正在使用 flink kafka producer 来接收关于 kafka 主题的消息。 speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), props, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE));
    【解决方案2】:

    通过将 flinkproducer 改为

    解决了这个问题

    speStream.addSink(new FlinkKafkaProducer011(kafkaTopicName,new SimpleStringSchema(), props));

    【讨论】:

    • FlinkPartitloner 与 Kafka 分区器不同
    猜你喜欢
    • 2021-10-04
    • 2017-10-18
    • 2017-04-28
    • 2023-03-26
    • 1970-01-01
    • 2018-02-05
    • 2019-11-10
    • 1970-01-01
    • 2019-06-19
    相关资源
    最近更新 更多