【问题标题】:Flink Kafka Custom Consumer for Rate LimitFlink Kafka 自定义消费者用于速率限制
【发布时间】:2021-06-17 08:02:00
【问题描述】:

我正在尝试扩展 FlinkKafkaConsumer 以使用 flink 版本 1.12 在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目的。但是,我在创建 AbstractFetcher 时遇到了 createFetcher 方法中的编译问题。

How to use Ratelimiter on flink?

他们是在 emitRecord 中命名的方法吗,因为我找不到 KafkaFetcherAbstractFetcher 类?

下面是代码sn-p

protected AbstractFetcher<T, ?> createFetcher(
        SourceContext<T> sourceContext,
        Map<KafkaTopicPartition, Long> partitionsWithOffsets,
        SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
        SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
        StreamingRuntimeContext runtimeContext,
        OffsetCommitMode offsetCommitMode,
        MetricGroup consumerMetricGroup, boolean useMetrics)
        throws Exception {

    return new KafkaFetcher<T>(
            sourceContext,
            partitionsWithOffsets,
            watermarksPeriodic,
            watermarksPunctuated,
            runtimeContext.getProcessingTimeService(),
            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
            runtimeContext.getUserCodeClassLoader(),
            runtimeContext.getTaskNameWithSubtasks(),
            deserializer,
            properties,
            pollTimeout,
            runtimeContext.getMetricGroup(),
            consumerMetricGroup,
            useMetrics) {
        protected void emitRecord(T record,
                                  KafkaTopicPartitionState<TopicPartition> partitionState,
                                  long offset) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecord(record, partitionState, offset);
        }

        @Override
        protected void emitRecordWithTimestamp(T record,
                                               KafkaTopicPartitionState<TopicPartition> partitionState,
                                               long offset, long timestamp) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
        }
    };

}

解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制

【问题讨论】:

    标签: apache-flink flink-streaming flink-batch


    【解决方案1】:

    emitRecord 现在由org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter#emitRecord 实现。

    【讨论】:

    • 那么,在这种情况下,我如何实现限制 Flink Kafka Consumer 的限制。任何示例 sn-p 都可以帮助我
    • 如有任何建议,请提前致谢
    • 需要一些建议请。我被困在这一点上@David Anderson
    猜你喜欢
    • 2017-08-18
    • 2018-10-15
    • 1970-01-01
    • 2018-12-03
    • 2022-06-10
    • 1970-01-01
    • 2020-06-21
    • 2016-12-03
    • 1970-01-01
    相关资源
    最近更新 更多