【问题标题】:Set timestamp in output with Kafka Streams使用 Kafka Streams 在输出中设置时间戳
【发布时间】:2018-09-21 06:52:54
【问题描述】:

我在 Kafka 主题“原始数据”中获取 CSV,目标是通过发送具有正确时间戳(每行不同)的另一个主题“数据”中的每一行来转换它们。

目前,我有 2 个主播:

  • 用于拆分“原始数据”中的行,将它们发送到“内部”主题(无时间戳)
  • 带有TimestampExtractor 的一个消耗“内部”并将它们发送到“数据”。

我想通过直接设置时间戳来删除这个“内部”主题的使用,但我找不到方法(时间戳提取器仅在消费时使用)。

我在文档中偶然发现了这一行:

请注意,可以在处理器 API 中更改描述默认行为,方法是在调用 #forward() 时为输出记录显式分配时间戳。

但我找不到任何带有时间戳的签名。它们是什么意思?

你会怎么做?

编辑: 需要明确的是,我有一个 Kafka 主题,其中包含一条消息,其中包含事件时间和一些值,例如:

2018-01-01,hello 2018-01-02,world (这是一条消息,而不是两条)

我想在另一个主题中获取两条消息,并将 Kafka 记录时间戳设置为它们的事件时间(2018-01-01 和 2018-01-02),而不需要中间主题。

【问题讨论】:

  • 它需要是 Kafka Streams,还是您对 KSQL 的示例感兴趣?
  • 我只使用 Kafka Streams。如果在 KSQL 中是可能的,那就意味着在 Kafka Streams 中有一种方法。
  • 没关系,我会留给 Kafka Streams 专家来回答 :)

标签: apache-kafka apache-kafka-streams


【解决方案1】:

为输出设置时间戳需要 Kafka Streams 2.0,并且仅在 Processor API 中受支持。如果您使用 DSL,则可以使用 transform() 来使用这些 API。

正如您所指出的,您将使用context.forward()。电话是:

stream.transform(new TransformerSupplier() {
  public Transformer get() {
    return new Transformer() {
      // omit other methods for brevity
      // you need to get the `context` from `init()`

      public KeyValue transform(K key, V value) {
        // some business logic

        // you can call #forward() as often as you want
        context.forward(newKey, newValue, To.all().withTimestamp(newTimestamp));

        return null; // only return data via context#forward()
      }
    }
  }
});

【讨论】:

  • 不幸的是,这种方法不起作用:@Override public void forward(final K key, final V value, final To to) { toInternal.update(to); if (toInternal.hasTimestamp()) { recordContext.setTimestamp(toInternal.timestamp()); } 因为recordContext中的NPE = null;
  • 截取的代码不完整。注意注释“你需要从 init() 获取上下文”——你这样做了吗?
  • 我做到了。即使在init() 中初始化了上下文,recordContext 也存在无法在客户端代码中初始化的问题。
  • java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:110)
  • 也许开始一个新问题并分享您的代码?不确定atm可能是什么问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-06-24
  • 2020-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-07-14
  • 1970-01-01
相关资源
最近更新 更多