【问题标题】:Pausing Processing on Flink KeyedStream在 Flink KeyedStream 上暂停处理
【发布时间】:2018-10-26 21:07:34
【问题描述】:

我有一个 Flink 流应用程序,它需要能够在特定键控流上“暂停”和“取消暂停”处理。 “处理”意味着只对流执行一些简单的异常检测。

我们正在考虑的流程是这样的:

命令流,ProcessCommandPauseCommandResumeCommand,每个命令都有一个用于 KeyByid

ProcessCommands 会在处理之前检查密钥是否暂停,如果没有则缓冲。

PauseCommands 将暂停密钥的处理。

ResumeCommands 将取消暂停键的处理并刷新缓冲区。

这个流程看起来合理吗?如果是,我可以使用split 运算符之类的东西来实现吗?

示例流,省略了单个记录时间戳:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 

【问题讨论】:

  • 那么,您的实际流(包含记录以及id)将接收命令作为记录的一部分,并且将使用这些命令控制流处理对吗?
  • 是的,完全正确@shriyog。每条记录都有一个id 和一个command
  • processpause 命令有什么区别?您能否举一个展示输入命令流和预期行为的示例?
  • @shriyog,这些编辑是否能更好地解释?感谢您的帮助!

标签: apache-flink flink-streaming


【解决方案1】:

这可以使用Flink's Window operator 来实现。首先,通过应用map 操作创建基于POJOtuple 的流。

然后,根据您的需要,您可以在该流上使用keyBy 来获取keyedStream

现在,通过结合使用基于时间的无限 windowtriggerwindow function,您可以实现命令流的切换行为。

基本上,您可以使用windows 作为缓冲区,在接收到暂停记录后,它会保存进程记录,直到接收到恢复记录。您将编写一个自定义触发器,根据您的场景驱逐窗口(缓冲区)。

以下是 Trigger 的自定义实现,具有 onElement() 覆盖方法。

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

查看github repository中的完整工作示例

【讨论】:

  • 非常感谢!我会实现这个然后接受答案!
  • 一切都非常清楚 - 我唯一的问题是关于 transient paused 变量。将其保存在 ValueState 中同样合理,对吧?
  • 是的,这实际上是正确的方法。托管键控状态可以在这里为我们提供帮助ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/…
  • 太好了,这就是我正在使用的!再次感谢您的所有帮助。
  • 我认为直通记录的行为方式与流程记录类似,因为每个记录上的窗口(包含 1 条记录)都会被逐出。但是如果你想让process-passthrough 记录跳过整个窗口驱逐步骤,splitting 流是最好的选择。
猜你喜欢
  • 1970-01-01
  • 2016-03-22
  • 2012-01-13
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-10
  • 1970-01-01
相关资源
最近更新 更多