【问题标题】:How can you set the max.message.bytes of a state store changelog topic?如何设置状态存储更改日志主题的 max.message.bytes?
【发布时间】:2018-05-01 02:28:22
【问题描述】:

我有一个 Kafka Streams 应用程序,其中包含高达 10MiB 的消息。我想将这些消息保存在状态存储中,但 Kafka Streams 无法生成内部更改日志主题:

2017-11-17 08:36:19,792 ERROR RecordCollectorImpl - task [4_5] Error sending record to topic appid-statestorename-state-store-changelog. No more offsets will be recorded for this task and the exception will eventually be thrown
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2017-11-17 08:36:20,583 ERROR StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 4_5 due to flush state:

通过添加一些日志记录,看起来内部主题的默认 max.message.bytes 设置为 1MiB。

集群的默认 max.message.bytes 设置为 50MiB。

是否可以调整 Kafka Streams 应用内部主题的配置?


解决方法是启动流应用程序,让它创建主题,然后更改主题配置。但这感觉就像一个肮脏的黑客。

./kafka-topics.sh --zookeeper ... \
      --alter --topic appid-statestorename-state-store-changelog \
      --config max.message.bytes=10485760

【问题讨论】:

  • AFAIK 无法以编程方式更改此配置。作为一种可能的解决方法,您可以使用自定义参数预先创建所有主题,然后启动您的应用程序。
  • atm 无法为内部主题指定自定义属性。这是一个已知问题,将来肯定会修复。随意创建一个 JIRA:issues.apache.org/jira/browse/KAFKA-1?jql=project%20%3D%20KAFKA - 因此您的解决方法是正确的。注意,您也可以通过Topology#describe()(或KafkaStreams#toString()——旧API)获取更新日志主题的名称
  • 感谢大家的快速回复。我们使用 Ansible 维护我们的主题创建/配置,因此目前我更喜欢预先创建主题,而不是同步应用程序部署和主题配置。

标签: apache-kafka apache-kafka-streams


【解决方案1】:

Kafka 1.0 允许通过StreamsConfig 为内部主题指定自定义主题属性。

您可以为这些配置添加前缀"topic.",并且可以使用TopicConfig 中定义的任何配置。

更多详情请参阅原始 KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-01-28
    • 1970-01-01
    • 1970-01-01
    • 2020-06-22
    • 2019-07-08
    • 2020-10-29
    • 2016-06-03
    相关资源
    最近更新 更多