【问题标题】:Set timestamp in output with Kafka Streams fails for transformations使用 Kafka Streams 在输出中设置时间戳无法进行转换
【发布时间】:2018-11-06 09:42:34
【问题描述】:

假设我们有一个转换器(用 Scala 编写)

new Transformer[String, V, (String, V)]() {
  var context: ProcessorContext = _

  override def init(context: ProcessorContext): Unit = {
    this.context = context
  }

  override def transform(key: String, value: V): (String, V) = {
    val timestamp = toTimestamp(value)
    context.forward(key, value, To.all().withTimestamp(timestamp))
    key -> value
  }

  override def close(): Unit = ()
}

其中toTimestamp 只是一个函数,它返回从记录值中获取的时间戳。一旦它被执行,就会有一个 NPE:

Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
    at CustomTransformer.transform()
    at CustomTransformer.transform()
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
    at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
    at 

实际上发生的是ProcessorContextImpl 失败:

public <K, V> void forward(final K key, final V value, final To to) {
    toInternal.update(to);
    if (toInternal.hasTimestamp()) {
        recordContext.setTimestamp(toInternal.timestamp());
    }
    final ProcessorNode previousNode = currentNode();

因为recordContext 没有初始化(只能由KafkaStreams 在内部完成)。

这是一个后续问题Set timestamp in output with Kafka Streams 1

【问题讨论】:

  • 你是如何在代码块中解决这个问题的?我使用ProcessorSupplier#get( Processor::new )。但不为我工作。问题,RecordContext is null

标签: scala apache-kafka apache-kafka-streams


【解决方案1】:

如果您使用transformer,则需要确保在调用TransformerSupplier#get() 时创建了一个新的Transformer 对象。 (参见https://docs.confluent.io/current/streams/faq.html#why-do-i-get-an-illegalstateexception-when-accessing-record-metadata

在最初的问题中,我认为这是关于导致 NPE 的 context 变量,但现在我意识到它与 Kafka Streams 内部有关。

Scala API 在 2.0.0 中有一个错误,可能会导致重复使用相同的 Transformer 实例 (https://issues.apache.org/jira/browse/KAFKA-7250)。我认为您遇到了这个错误。稍微重写你的代码应该可以解决问题。请注意,Kafka 2.0.1 和 Kafka 2.1.0 包含一个修复程序。

【讨论】:

    【解决方案2】:

    @matthias-j-sax 如果处理器在 Java 代码中重用,则行为相同。

        Topology topology = new Topology();
        MyProcessor myProcessor = new MyProcessor();
        topology.addSource("source", "topic-1")
                .addProcessor(
                        "processor",
                        () -> {
                            return myProcessor;
                        },
                        "source"
                )
                .addSink("sink", "topic-2", "processor");
        KafkaStreams streams = new KafkaStreams(topology, config);
        streams.start();
    

    【讨论】:

    • 是的,但是在 Scala API 中,可以确保每次都创建一个新实例——在 2.0.0 中,如果您编写惯用的 Scala 并且需要通过编写非惯用的 Scala -- 在2.0.1 中,这是固定的,如果您也编写惯用的 Scala(它应该是这样),则会创建一个新实例。不幸的是,在 Java 中,无法防范这种情况。
    猜你喜欢
    • 1970-01-01
    • 2020-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-09
    • 2018-02-28
    • 2019-06-24
    • 1970-01-01
    相关资源
    最近更新 更多