【问题标题】:Kafka Streams KTable configuration error on Message Hub消息中心上的 Kafka Streams KTable 配置错误
【发布时间】:2018-08-06 16:40:11
【问题描述】:

消息中心现已解决此问题

我在 Kafka 中创建 KTable 时遇到了一些问题。我是 Kafka 的新手,这可能是我问题的根源,但我想无论如何我都可以在这里问。我有一个项目,我想通过计算它们的总出现次数来跟踪不同的 ID。我在 IBM Cloud 上使用 Message Hub 来管理我的主题,到目前为止效果很好。

我在 Message Hub 上有一个主题,它会生成像 {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"} 这样的消息,目前,唯一相关的关键字是 ID。

我的 Kafka 代码以及 Streams 配置如下所示:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

当我运行代码时,我收到以下错误:

线程“KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1”中的异常 org.apache.kafka.streams.errors.StreamsException:无法创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE- 0000000003-重新分区。

接着是:

原因:java.util.concurrent.ExecutionException:org.apache.kafka.common.errors.PolicyViolationException:无效配置:{segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete,段.ms=600000}。仅允许的配置:[retention.ms, cleanup.policy]

我不知道为什么会发生此错误,以及可以采取什么措施。我构建 KStream 和 KTable 的方式是否不正确?或者也许是 bluemix 上的消息中心?

已解决:

在我标记为正确的答案下方添加来自 cmets 的摘录。原来我的 StreamsConfig 很好,而且(目前)消息中心方面存在问题,但有一个解决方法:

原来 Message Hub 在使用 Kafka Streams 1.1 创建重新分区主题时存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后我会发表另一条评论

非常感谢您的帮助!

【问题讨论】:

  • 您可以添加您的 Kafka Streams 应用程序配置(或属性文件)吗?

标签: java apache-kafka ibm-cloud apache-kafka-streams message-hub


【解决方案1】:

Message Hub 有一些restrictions 关于创建主题时可以使用的配置。

根据您收到的 PolicyViolationException,您的 Streams 应用程序似乎尝试使用我们不允许的一些配置:

  • segment.index.bytes
  • segment.bytes
  • segment.ms

我猜你在 Streams 配置中的某个地方设置了这些,它们应该被删除。

请注意,您还需要在配置中将 StreamsConfig.REPLICATION_FACTOR_CONFIG 设置为 3 才能使用我们的 docs 中提到的 Message Hub。

【讨论】:

  • 感谢您的回复!我认为你绝对是对的。我添加了我的流配置,但我认为使它与 Message Hub(MH) 一起工作的必要条件已经到位。至少根据文档。我不明白的是,当我不想这样做时,为什么我会得到“无法创建主题”,除非 KTable 算作一个?可以通过在MH上分配一个主题来解决吗?就像我在问题开始时写的那样,我想从 MH 计算主题中 ID 的出现次数,它是否与 MH 有更多的关系,而不仅仅是听主题?
  • 是的,您的转换逻辑将创建“内部”主题,请参阅kafka.apache.org/11/documentation/streams/developer-guide/…。您可以事先手动预先创建它们,但让 Streams 来做通常更容易。否则,我认为您的逻辑看起来不错。
  • 我明白了。因此,当我尝试创建 KTable 时,我收到错误消息,因为我被限制在 Message Hub 上以这种方式创建主题。如果我拥有的代码是从 MH 主题中检索消息并尝试在 MH 上创建内部主题(KTable),我是否理解正确?有没有办法改变我的代码以使用 Streams 在其他地方创建 KTable?或者我是否需要另一个 Kafka 服务器来实现我所追求的? MH 可以处理 KTables 吗?抱歉所有问题,我非常感谢您的帮助。
  • 我设法重现了您的问题。事实证明,在使用 Kafka Streams 1.1 创建重新分区主题时,Message Hub 存在问题。在我们进行修复时,您需要手动创建主题 KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition。它需要与您的输入主题(myTopic)一样多的分区,并将保留时间设置为最大值。修复后我会发表另一条评论
  • 我完全忘记在这里发帖了,但是这个问题已经在几周前修复了
猜你喜欢
  • 2017-08-13
  • 1970-01-01
  • 1970-01-01
  • 2019-10-26
  • 2017-07-28
  • 2020-01-30
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多