【问题标题】:How to get the processing kafka topic name dynamically in Flink Kafka Consumer?如何在 Flink Kafka Consumer 中动态获取正在处理的 kafka 主题名称?
【发布时间】:2019-12-07 12:22:55
【问题描述】:

目前我有一个 Flink Cluster 想通过一个 Pattern 消费 Kafka Topic,通过这种方式,我们不需要维护一个硬编码的 Kafka 主题列表。

import java.util.regex.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
...
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
...
FlinkKafkaConsumer010<KafkaMessage> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, deserializerClazz.newInstance(), kafkaConsumerProps);
DataStream<KafkaMessage> input = env.addSource(kafkaConsumer);

我只是想通过上面的方式知道,在处理过程中如何才能知道真正的Kafka主题名称? 谢谢。

--更新-- 我之所以需要知道主题信息,是因为我们需要这个主题名称作为参数,在接下来的 Flink sink 部分中使用。

【问题讨论】:

  • 嗨@Deadpool KafkaMessage类是我们开发的,有'topic'、'key'、'offset'等多个字段,将通过自研的deserialize class.Tks进行反序列化。跨度>

标签: java apache-kafka apache-flink flink-streaming


【解决方案1】:

有两种方法可以做到这一点。

选项 1:

您可以使用 Kafka-clients 库来访问 Kafka 元数据,获取主题列表。添加 Maven 依赖项或等效项。

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

您可以从 Kafka 集群中获取主题并使用下面给出的正则表达式进行过滤

 private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");

  Properties properties = new Properties();
  properties.put("bootstrap.servers","localhost:9092");
  properties.put("client.id","java-admin-client");
  try (AdminClient client = AdminClient.create(properties)) {
     ListTopicsOptions options = new ListTopicsOptions();
     options.listInternal(false);
      Collection<TopicListing> listing =  client.listTopics(options).listings().get();
      List<String> allTopicsList = listings.stream().map(TopicListing::name)
      .collect(Collectors.toList());
      List<String> matchedTopics = allTopicsList.stream()
                            .filter(topicPattern.asPredicate())
                            .collect(Collectors.toList());
    }catch (Exception e) {
      e.printStackTrace();
    }
}

一旦你匹配了Topics列表,你就可以将它传递给FlinkKafkaConsumer。

选项 2:

Flink 1.8 版中的FlinkKafkaConsumer011 支持基于模式动态发现主题和分区。下面是例子:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    topicPattern ,
    new SimpleStringSchema(),
    properties);

链接:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery

在您的情况下,选项 2 最适合。

由于您想在 KafkaMessage 中访问主题元数据,因此您需要实现 KafkaDeserializationSchema 接口,如下所示:

public class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema<KafkaMessage> {
    /**
     * Deserializes the byte message.
     *
     * @param messageKey the key as a byte array (null if no key has been set).
     * @param message The message, as a byte array (null if the message was empty or deleted).
     * @param partition The partition the message has originated from.
     * @param offset the offset of the message in the original source (for example the Kafka offset).
     *
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    @Override
    public KafkaMessage deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
        //You can access record.key(), record.value(), record.topic(), record.partition(), record.offset() to get topic information.
         KafkaMessage kafkaMessage = new KafkaMessage();
         kafkaMessage.setTopic(record.topic());
         // Make your kafka message here and assign the values like above.
        return kafkaMessage ;
    }

    @Override
    public boolean isEndOfStream(Long nextElement) {
        return false;
    }       
}

然后调用:

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);

【讨论】:

  • 您好 Tks,但这是否意味着一旦创建了新的 kafka 主题,就需要重新创建 flink 作业以获取完整的主题列表?
  • Flink 还支持动态主题和分区发现。请检查更新的答案
  • 您好 Tks,您能否提供更多关于如何将主题发布的代码?实际上,我们使用的是FlinkKafkaConsumer010&lt;KafkaMessage&gt; KafkaMessage 是我们自己定义的一个bean。
  • 如果你想把主题信息放到KafkaMessage中,你应该为类型KafkaMessage实现KafkaDeserializationSchema类。请检查更新的答案
  • 您好,我的问题描述一开始就已经涵盖了您的选项2,但是无论如何,访问主题名称本身仍然没有提及,Tks仍然。
【解决方案2】:

您可以实现自己的自定义 KafkaDeserializationSchema,如下所示:

  public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {
    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        return new Tuple2<>(record.topic(), new String(record.value(), "UTF-8"));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return new TupleTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
    }
  }

使用自定义的 KafkaDeserializationSchema,您可以创建元素包含主题信息的 DataStream。在我的演示案例中,元素类型为Tuple2&lt;String, String&gt;,因此您可以通过Tuple2#f0 访问主题名称。

FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
          topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);
DataStream<Tuple2<String, String>> input = env.addSource(kafkaConsumer);

input.process(new ProcessFunction<Tuple2<String,String>, String>() {
            @Override
            public void processElement(Tuple2<String, String> value, Context ctx, Collector<String> out) throws Exception {
                String topicName = value.f0;
                // your processing logic here.
                out.collect(value.f1);
            }
        });

【讨论】:

  • Tks,通过使用这个自己的自定义 KafkaDeserializationSchema,您能否提供更多信息,我们如何获得 kafka 主题?谢了。
  • @JMS,嗨,我已经更新了答案,希望对你有用。
  • 谢谢,我不太确定你的访问方式“Tuple2#f0”,你的意思是使用DataStreamUtils.collect(input)吗?
  • 处理DataStream时可以访问主题名。我添加了一个带有process函数的演示。
  • 谢谢,我现在明白了,您能再帮忙解决一个要求吗?实际上,我想要获取真正的主题列表的原因是在获得完整的主题列表之后,我需要创建新的 flink kakfaconsume 以将 kafka 数据下沉到 HDFS 中,其中主题名称需要在写入的桶文件中使用,通过使用这个这样我就不需要在本地通过CSV文件手动维护主题列表了,你觉得可以吗?
猜你喜欢
  • 1970-01-01
  • 2022-06-30
  • 2018-09-30
  • 2016-02-08
  • 2016-05-24
  • 1970-01-01
  • 2019-10-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多