【发布时间】:2021-06-17 08:02:00
【问题描述】:
我正在尝试扩展 FlinkKafkaConsumer 以使用 flink 版本 1.12 在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目的。但是,我在创建 AbstractFetcher 时遇到了 createFetcher 方法中的编译问题。
How to use Ratelimiter on flink?
他们是在 emitRecord 中命名的方法吗,因为我找不到 KafkaFetcher 和 AbstractFetcher 类?
下面是代码sn-p
protected AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> partitionsWithOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup consumerMetricGroup, boolean useMetrics)
throws Exception {
return new KafkaFetcher<T>(
sourceContext,
partitionsWithOffsets,
watermarksPeriodic,
watermarksPunctuated,
runtimeContext.getProcessingTimeService(),
runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
runtimeContext.getUserCodeClassLoader(),
runtimeContext.getTaskNameWithSubtasks(),
deserializer,
properties,
pollTimeout,
runtimeContext.getMetricGroup(),
consumerMetricGroup,
useMetrics) {
protected void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record, partitionState, offset);
}
@Override
protected void emitRecordWithTimestamp(T record,
KafkaTopicPartitionState<TopicPartition> partitionState,
long offset, long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record, partitionState, offset, timestamp);
}
};
}
解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制
【问题讨论】:
标签: apache-flink flink-streaming flink-batch