【问题标题】:Kafka streams fail on decoding timestamp metadata inside StreamTaskKafka 流无法解码 StreamTask 中的时间戳元数据
【发布时间】:2020-04-01 14:01:48
【问题描述】:

在启动应用程序期间,我们在 Kafka Streams 上遇到了奇怪的错误

java.lang.IllegalArgumentException: Illegal base64 character 7b
    at java.base/java.util.Base64$Decoder.decode0(Base64.java:743)
    at java.base/java.util.Base64$Decoder.decode(Base64.java:535)
    at java.base/java.util.Base64$Decoder.decode(Base64.java:558)
    at org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:985)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:303)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:265)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:71)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

因此,有关失败流的错误:ERROR KafkaStreams - stream-client [xxx] All stream threads have died. The instance will be in error state and should be closed.

根据org.apache.kafka.streams.processor.internals.StreamTask内部的代码,由于解码时间戳元数据(StreamTask.decodeTimestamp())时出错而发生故障。它发生在 prod 上,无法在舞台上重现。 此类错误的根本原因可能是什么?

额外信息:我们的应用程序使用 Kafka-Streams 并使用相同的 application.idstate.dir 使用来自多个 kafka 代理的消息(实际上我们从一个代理切换到另一个代理,但在一段时间内我们连接到两个代理,所以我们有两个 kafka 流,每个代理一个)。据我了解,消费者组位于代理端(所以应该不是问题),但状态目录在客户端。也许由于对两个 kafka 流使用相同的state.dir 而发生了一些竞争情况?这可能是根本原因吗?

我们使用kafka-streams v.2.4.0kafka-clients v.2.4.0、Kafka Broker v.1.1.1,配置如下:

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor
default.deserialization.exception.handler: org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
commit.interval.ms: 5000
num.stream.threads: 1
auto.offset.reset: latest

【问题讨论】:

  • 不清楚这是怎么发生的。 Kafka Streams 会在提交元数据中编码一个时间戳,并在稍后尝试解码这个时间戳。不确定此元数据是否/如何损坏。会不会是其他一些客户端进行了手动提交,包括相应application.id(即group.id)的一些提交元数据?
  • @MatthiasJ.Sax 我更新了帖子,提供了有关为两个经纪人重用相同的state.dir 值的额外信息。也许它以某种方式损坏了提交元数据,或者发生了竞争条件。手动提交是不可能的,我们只通过 Kafka Streams 连接
  • 状态目录似乎与此问题无关。也许这是一个短暂的网络问题“翻转”了一点? -- 如果你真的想深入挖掘,你可以通过bin/kafka-dump-log.sh 检查__committed_offsets 主题,看看是否存在损坏的元数据——但这确实是一个兔子洞。 -- 或者可能有一些错误,但不清楚它可能是什么。
  • 谢谢你,我会试试看

标签: apache-kafka apache-kafka-streams


【解决方案1】:

最后,我们找出了一些消费者群体损坏元数据的根本原因。 这是我们的内部监控工具之一(使用pykafka 编写),它通过暂时不活动的消费者组破坏了元数据。 元数据未加密并包含无效数据,如下所示:{"consumer_id": "", "hostname": "monitoring-xxx"}。 为了了解消费者元数据中究竟有什么,我们可以使用以下代码:

Map<String, Object> config = Map.of( "group.id", "...", "bootstrap.servers", "...");
String topicName = "...";
Consumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
Set<TopicPartition> topicPartitions = kafkaConsumer.partitionsFor(topicName).stream()
        .map(partitionInfo -> new TopicPartition(topicName, partitionInfo.partition()))
        .collect(Collectors.toSet());
kafkaConsumer.committed(topicPartitions).forEach((key, value) ->
    System.out.println("Partition: " + key + " metadata: " + (value != null ? value.metadata() : null)));

修复已损坏元数据的几个选项:

  • 将消费者组更改为新的。请注意,根据latestearliest 偏移重置策略,您可能会丢失或重复消息。所以在某些情况下,这个选项可能是不可接受的
  • 手动覆盖元数据(时间戳按照StreamTask.decodeTimestamp()内部的逻辑编码):

    Map<TopicPartition, OffsetAndMetadata> updatedTopicPartitionToOffsetMetadataMap = kafkaConsumer.committed(topicPartitions).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, (entry) -> new OffsetAndMetadata((entry.getValue()).offset(), "AQAAAXGhcf01"))); kafkaConsumer.commitSync(updatedTopicPartitionToOffsetMetadataMap); 或将元数据指定为 Af//////////,这意味着 Kafka Streams 中的 NO_TIMESTAMP

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-12-08
    • 2021-12-04
    • 2023-03-27
    • 2020-05-27
    • 2017-01-23
    • 2021-12-21
    • 2020-01-30
    相关资源
    最近更新 更多