【问题标题】:Kafka Streams - How to scale Kafka store generated changelog topicsKafka Streams - 如何扩展 Kafka 存储生成的变更日志主题
【发布时间】:2018-11-28 21:35:20
【问题描述】:

我有多个冗余应用程序实例,它们想要消耗一个主题的所有事件并将它们独立存储以进行磁盘查找(通过 Rocksdb)。

为了论证起见,我们假设这些多余的消费者正在服务于无状态的 http 请求;因此负载不是使用 kafka 共享的,而是 kafka 用于将数据从生产者复制到每个实例本地存储中。

查看生成的主题时,每个消费应用创建了 3 个额外主题:

  • {topicname}STATE-STORE-0000000000-changelog
  • {应用程序名称}-{商店名称}-更改日志
  • {应用程序名称}-{商店名称}-重新分区

但是这些生成的每个主题都与原始主题的压缩视图一样大。这意味着每个消费存储将原始主题的大小(已经压缩)乘以 3。

  1. 为什么kafka store需要这3个topic。我们不能简单地将流配置为在协调 ondisk 存储时从上次使用的偏移量重新加载吗?
  2. 是冗余消费应用程序的每个实例都获得其唯一的 3 个“存储生成的主题”集,还是应该将它们配置为共享同一组变更日志主题?那么,它们应该共享相同的 applicationId 还是不共享,因为它们需要消耗所有分区的所有事件?

简而言之,我担心存储可扩展性,因为我们会增加使用的应用程序的数量,这会产生更多的变更日志主题...

这是创建商店的代码

public class ProgramMappingEventStoreFactory {
  private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
  private final static String STORE_NAME = "program-mapping-store";
  private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";

  public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
                                                                                               String avroRegistryUrl,
                                                                                               String topic,
                                                                                               String storeDirectory)
  {
    Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
                                                     .withSchemaRegistryUrl(avroRegistryUrl)
                                                     .withApplicationId(createApplicationId(APPLICATION_NAME))
                                                     .withGroupId(UUID.randomUUID().toString())
                                                     .withClientId(UUID.randomUUID().toString())
                                                     .withDefaultKeySerdeClass(SpecificAvroSerde.class)
                                                     .withDefaultValueSerdeClass(SpecificAvroSerde.class)
                                                     .withStoreDirectory(storeDirectory)
                                                     .build();

    StreamsBuilder streamBuilder = new StreamsBuilder();
    bootstrapStore(streamBuilder, topic);
    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
    streams.start();
    try {
      return getStoreAndBlockUntilQueryable(STORE_NAME,
                                            QueryableStoreTypes.keyValueStore(),
                                            streams);
    } catch (InterruptedException e) {
      throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
    }
  }

  private static <T> T getStoreAndBlockUntilQueryable(String storeName,
                                                      QueryableStoreType<T> queryableStoreType,
                                                      KafkaStreams streams)
    throws InterruptedException
  {
    while (true) {
      try {
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        Thread.sleep(100);
      }
    }
  }

  private static void bootstrapStore(StreamsBuilder builder, String topic) {
    KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);

    table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
                                                        (newValue, aggValue) -> null,
                                                        Materialized.as(STORE_NAME));

  }

  private static String createApplicationId(String applicationName) {
    try {
      return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
    } catch (UnknownHostException e) {
      logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
      return String.format("%s-%s", applicationName, UUID.randomUUID());
    }
  }

}

【问题讨论】:

  • 你能分享你的代码吗?我有点不清楚你到底想做什么。关于主题重用:您是正确的,Kafka Streams 实际上进行了此优化(请注意,在某些版本中存在回归,因此您可能无法获得此优化:issues.apache.org/jira/browse/KAFKA-6729)。我也不确定您所说的“冗余”是什么意思:如果您配置相同的application.id,则实例形成一个消费者组并共享负载(即,数据在实例上进行分区)。
  • 我已经添加了创建商店的代码。在上面的场景中,我将创建同一个应用程序的多个实例来创建商店(每个实例都需要缓存相同的数据)。我在每个实例上强制执行唯一的 applicationId 以确保它们不会获得数据碎片。但因此每个实例都会创建 3 个新的更改日志主题。
  • groupBy((k, v) -&gt; KeyValue.pair(k, v)).reduce() 的目的是什么?是的,每个实例显然都会创建它自己的内部主题——如果您为每个实例使用不同的 application.id,它基本上是三个相互完全隔离的独立应用程序。但是,我想知道您是否可能想使用builder.globalKTable(),它为您提供数据的完整副本,并且您可以在每个实例中使用相同的application.id。 (你用的是什么版本?)
  • 我实际上只是偶然发现了 globalKTable 的概念来支持存储而不是普通的 KTable。我正在使用 1.0.0 版的 kafka。考虑到所有实例都具有相同 applicationId 的 globalKTable,我是否仍然期望新的变更日志主题?那么重新分区主题和 STATESTORE-0000 主题呢,它们会与 GlobalKTable 一起消失吗?
  • 由于groupBy((k, v) -&gt; KeyValue.pair(k, v)).reduce(),您获得了重新分区主题——不知道您为什么这样做?如果你需要这个,你总会得到重新分区的主题。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

如果您想将相同的状态加载到多个实例中,您应该在所有实例中使用GlobalKTable 和唯一的application.id (builder.globalTable())。

如果您使用KTable,数据会被分区,这会迫使您为每个实例使用不同的application.id。这可以被认为是一种反模式。

我也不确定,你为什么要groupBy((k, v) -&gt; KeyValue.pair(k, v)).reduce()——这会导致不必要的重新分区主题。

对于为table() 运算符生成的更改日志主题,如果使用StreamsBuilderKStreamBuilder 不受影响),1.01.1 版本中存在已知错误。它已在 2.0 版本中修复 (https://issues.apache.org/jira/browse/KAFKA-6729)

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2019-09-07
  • 2021-01-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-08
相关资源
最近更新 更多