【问题标题】:RocksDB exception in Kafka StreamsKafka Streams 中的 RocksDB 异常
【发布时间】:2019-03-25 11:57:30
【问题描述】:

在一个简单的 Kafka Stream 程序中,当我使用以下代码时,它可以正常工作而不会引发任何错误:

      KTable<String, Long> result= source.mapValues(textLine
      ->textLine.toLowerCase()) .flatMapValues(lowercasedTextLine ->
      Arrays.asList(lowercasedTextLine.split(" "))) .selectKey((ignoredKey,word) ->
      word) .groupByKey() .count("Counts");

      result.to(Serdes.String(), Serdes.Long(), "wc-output");

但是,当我使用以下代码时,我收到了错误:

    KStream<String, String> source = builder.stream("wc-input");
    source.groupBy((key, word) -> word).windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(5000))).count()
            .toStream().map((key, value) -> new KeyValue<>(key.key(), value))
            .to("wc-output", Produced.with(Serdes.String(), Serdes.Long()));

线程异常 “流-wordcount-b160d715-f0e0-42ee-831e-0e4eed7e9424-StreamThread-1” org.apache.kafka.streams.errors.StreamsException:异常捕获 过程。 taskId=1_0,处理器=KSTREAM-SOURCE-0000000006, 主题=流-字数-KSTREAM-AGGREGATE-STATE-STORE-0000000002-重新分区, 分区 = 0,偏移量 = 0 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232) 在 org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403) 在 org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317) 在 org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) 引起:org.apache.kafka.streams.errors.ProcessorStateException: 开店出错 KSTREAM-AGGREGATE-STATE-STORE-0000000002:1553472000000 在位置 \tmp\kafka-streams\streams-wordcount\1_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:1553472000000 在 org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) 在 org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) 在 org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) 在 org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) 在 org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) 在 org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) 在 org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) 在 org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) 在 org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) 在 org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:100) 在 org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) 在 org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232) 在 org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:245) 在 org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153) 在 org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:157) 在 org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36) 在 org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:96) 在 org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:122) 在 org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) 在 org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) 在 org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) 在 org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) 在 org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216) ... 6 更多原因:org.rocksdb.RocksDBException:创建失败 目录: H:\tmp\kafka-streams\streams-wordcount\1_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:1553472000000: org.rocksdb.RocksDB.open(Native Method) 的参数无效 org.rocksdb.RocksDB.open(RocksDB.java:231) 在 org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197)

【问题讨论】:

  • 请改进代码的格式。这将使您更容易阅读和回答您的问题。
  • 似乎 Kafka Streams 无法创建状态存储目录。您可以尝试将状态存储目录更改为另一个路径,看看是否能解决您的问题。
  • 您使用的是哪个 Kafka Streams 版本?
  • 这仍然发生在 Centos 7 和 Kafka 版本 2.1.0

标签: lambda apache-kafka kafka-consumer-api apache-kafka-streams


【解决方案1】:

当您使用窗口聚合时,以不同的方式存储名称,并且 Kafka 1.0.0 中存在影响 Windows 操作系统的错误:窗口存储的名称包含 Windows 操作系统上不允许的 :。该错误已在版本1.0.11.1.0 中修复

参照。 https://issues.apache.org/jira/browse/KAFKA-6167

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-10-06
    • 2021-10-12
    • 1970-01-01
    • 1970-01-01
    • 2017-10-07
    • 2017-06-08
    • 2020-09-05
    • 1970-01-01
    相关资源
    最近更新 更多