【发布时间】:2018-11-06 09:42:34
【问题描述】:
假设我们有一个转换器(用 Scala 编写)
new Transformer[String, V, (String, V)]() {
var context: ProcessorContext = _
override def init(context: ProcessorContext): Unit = {
this.context = context
}
override def transform(key: String, value: V): (String, V) = {
val timestamp = toTimestamp(value)
context.forward(key, value, To.all().withTimestamp(timestamp))
key -> value
}
override def close(): Unit = ()
}
其中toTimestamp 只是一个函数,它返回从记录值中获取的时间戳。一旦它被执行,就会有一个 NPE:
Exception in thread "...-6f3693b9-4e8d-4e65-9af6-928884320351-StreamThread-5" java.lang.NullPointerException
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
at CustomTransformer.transform()
at CustomTransformer.transform()
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:302)
at org.apache.kafka.streams.scala.kstream.KStream$$anon$1$$anon$2.transform(KStream.scala:300)
at
实际上发生的是ProcessorContextImpl 失败:
public <K, V> void forward(final K key, final V value, final To to) {
toInternal.update(to);
if (toInternal.hasTimestamp()) {
recordContext.setTimestamp(toInternal.timestamp());
}
final ProcessorNode previousNode = currentNode();
因为recordContext 没有初始化(只能由KafkaStreams 在内部完成)。
【问题讨论】:
-
你是如何在代码块中解决这个问题的?我使用
ProcessorSupplier#get( Processor::new )。但不为我工作。问题,RecordContextis null
标签: scala apache-kafka apache-kafka-streams