有两种方法可以做到这一点。
选项 1:
您可以使用 Kafka-clients 库来访问 Kafka 元数据,获取主题列表。添加 Maven 依赖项或等效项。
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
您可以从 Kafka 集群中获取主题并使用下面给出的正则表达式进行过滤
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092");
properties.put("client.id","java-admin-client");
try (AdminClient client = AdminClient.create(properties)) {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(false);
Collection<TopicListing> listing = client.listTopics(options).listings().get();
List<String> allTopicsList = listings.stream().map(TopicListing::name)
.collect(Collectors.toList());
List<String> matchedTopics = allTopicsList.stream()
.filter(topicPattern.asPredicate())
.collect(Collectors.toList());
}catch (Exception e) {
e.printStackTrace();
}
}
一旦你匹配了Topics列表,你就可以将它传递给FlinkKafkaConsumer。
选项 2:
Flink 1.8 版中的FlinkKafkaConsumer011 支持基于模式动态发现主题和分区。下面是例子:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private static final Pattern topicPattern = Pattern.compile("(DC_TEST_([A-Z0-9_]+)");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
topicPattern ,
new SimpleStringSchema(),
properties);
链接:https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-topic-and-partition-discovery
在您的情况下,选项 2 最适合。
由于您想在 KafkaMessage 中访问主题元数据,因此您需要实现 KafkaDeserializationSchema 接口,如下所示:
public class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema<KafkaMessage> {
/**
* Deserializes the byte message.
*
* @param messageKey the key as a byte array (null if no key has been set).
* @param message The message, as a byte array (null if the message was empty or deleted).
* @param partition The partition the message has originated from.
* @param offset the offset of the message in the original source (for example the Kafka offset).
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
@Override
public KafkaMessage deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
//You can access record.key(), record.value(), record.topic(), record.partition(), record.offset() to get topic information.
KafkaMessage kafkaMessage = new KafkaMessage();
kafkaMessage.setTopic(record.topic());
// Make your kafka message here and assign the values like above.
return kafkaMessage ;
}
@Override
public boolean isEndOfStream(Long nextElement) {
return false;
}
}
然后调用:
FlinkKafkaConsumer010<Tuple2<String, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(
topicPattern, new CustomKafkaDeserializationSchema, kafkaConsumerProps);