【问题标题】:can I use KSQL to generate processing-time timeouts?我可以使用 KSQL 生成处理时间超时吗?
【发布时间】:2018-11-14 03:07:39
【问题描述】:

我正在尝试使用 KSQL 在时间限制内进行任何处理,并在该时间限制内获得结果。请参阅“处理时间计时器”下的Timely (and Stateful) Processing with Apache Beam,了解使用 Apache Beam 说明的相同想法。

给定:

  1. 具有唯一键的事务流;
  2. 在同一流中更新这些事务;和
  3. 下游处理器希望在事务出现在第一个流中之后的特定超时(例如 20 秒)接收更新的事务。

从概念上讲,我正在考虑创建第一个流的 KTable 以保存事务的最新状态,并使用 KSQL 通过在 KTable 中查询 (create_time + timeout)

我在 KSQL 文档中没有找到执行此操作的方法,即使有内置的 current_time,我也不确定它是否会被评估,直到另一个记录顺流而下。

如何在 KSQL 中做到这一点?我需要自定义 UDF 吗?如果在 KSQL 中做不到,我可以在 KStreams 中做吗?

=====

更新:现在 KStreams 似乎不支持这个 - Apache Flink 似乎是这个用例(以及许多其他用例)的方法。如果您知道绕过 KStreams 限制的巧妙方法,请告诉我!

【问题讨论】:

    标签: apache-flink apache-kafka-streams ksqldb


    【解决方案1】:

    查看 Kafka Streams 的 Processor API 中的 punctuate() 功能,这可能是您正在寻找的。您可以将 punctuate() 与流时间(默认值:事件时间)以及处理时间(通过PunctuationType.WALL_CLOCK_TIME)一起使用。在这里,您可以根据需要实现ProcessorTransformer,它们将使用punctuate() 来实现超时功能。

    更多信息请参见https://kafka.apache.org/documentation/streams/developer-guide/processor-api.html

    提示:您也可以在 Kafka Streams 的 DSL 中使用这样的处理器/转换器。这意味着您可以继续使用更方便的 DSL,如果您愿意,只需在基于 DSL 的代码的正确位置插入处理器/变压器。

    【讨论】:

    • 谢谢!我可能会先尝试一下 Flink API 中的高级抽象,然后再尝试。
    猜你喜欢
    • 1970-01-01
    • 2018-07-19
    • 1970-01-01
    • 1970-01-01
    • 2011-09-24
    • 2013-05-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多