【问题标题】:Solutions to fix stuck timers / activities in Cadence/SWF/StepFunctions在 Cadence/SWF/StepFunctions 中修复卡住的计时器/活动的解决方案
【发布时间】:2021-03-15 00:46:26
【问题描述】:

因此计时器在CadenceSWFStep functions 等工作流引擎中是持久的。 持久定时器对于需要等待很长一段时间,然后醒来执行一些业务逻辑的用例很有用。由于持久性,它对各种故障具有弹性,从而为开发人员提供更好的编程体验和模型。

但是,如果您想在计时器开始后更改它怎么办?像这个例子:

@Override
public void sampleWorkflowWithTimer(Input input){
  //...
  //some business logic before the timer
  //use a durable timer waiting for 7 days
  Workflow.sleep(Duration.ofDays(7));
  //send an email after the timer fires 
  activities.sendEmailReminder(input);
  //continue with other business 
  //...
}

计时器启动后,您可能希望计时器改为等待 3 天,甚至取消它。 如果您只是将工作流代码更改为此会怎么样?

@Override
public void sampleWorkflowWithTimer(Input input){
  //...
  //some business logic before the timer
  //use a durable timer waiting for 73 days 
  Workflow.sleep(Duration.ofDays(3));
  //send an email after the timer fires 
  activities.sendEmailReminder(input);
  //continue with other business after the timer fires 
  //...
}

这不起作用!

在 SWF、Step Functions 和 Cadence 中,这仅适用于尚未启动计时器的新工作流执行。但是,您真正想要解决的是“卡住”计时器等待 7 天的工作流程。

在 SWF 和 Cadence 中,这让事情变得更糟——“卡住”的工作流现在真的会因为“非确定性错误”(又名 NDE)而卡住,直到工作流超时。因为在幕后,Cadence/SWF 将持久时间变成了工作流历史中的计时器事件,带有计时器值。在重放期间,工作流期望看到存在完全相同的计时器值的计时器。看到不同的计时器值会导致工作流中的 NDE。

同样对于活动,当我们使用非常大的超时时它可能会卡住,但它会卡住,因为有一个进程/线程/RPC调用卡住了,或者在部署期间主机被杀死了。

那么解决办法是什么?

【问题讨论】:

    标签: aws-step-functions amazon-swf cadence-workflow uber-cadence


    【解决方案1】:

    计时器卡住的解决方案

    在 Step 函数中,没有真正的解决方案。因为不可能更改已经开始运行的工作流的状态机。因此,在 Step 函数中使用持久的 timer(wait) 时必须小心,尤其是当计时器值太大而可能会倒退时。您可以将大计时器值分解为较小的增量,并添加一些检查点,或者使用活动来模拟计时器。这样工作肯定很痛苦。

    在 SWF 中有一个复杂的解决方案,因为 SWF 中对工作流 versioning 的支持很差。基本思路如下:

    1. feature 标志添加到工作流输入。新启动的工作流可以使用 3 天作为计时器值,而启动的工作流不会受到影响。

    2. 将“Workflow.Sleep()”更改为使用 Promise,同时使用新的 Promise 来等待“操作信号”。处理操作信号时,改为等待新的定时器(或跳过定时器)。请注意,这是向后兼容的更改,因为等待信号不会涉及任何工作流历史事件。

    3. 向所有已启动的工作流发送操作信号。如果您要发送信号的工作流程太多,并且没有找到和发送信号的好方法,那么这里可能会很乏味。

    这种方法也适用于 Cadence,它与解决方案 #3 非常相似。

    此答案的其余部分描述了您可以在 Cadence 中采用的三种不同方法。

    解决方案 #1:重置工作流程

    这可能是最容易理解和应用的解决方案。 与上面的示例相同,您将工作流更新为使用 3 天作为计时器值,并让一些现有工作流陷入 NDE 状态。然后收集这些工作流,并使用“LastDecisionCompleted”resetType 重置它们。

    ./cadence --do <domain> wf reset --resetType LastDecisionCompleted -w <workflowID> -r <runID> --reason "some reason"
    

    LastDecisionCompleted resetType 意味着忘记上一个工作流决策任务的结果。在这种情况下,它正是安排了 7 天计时器的那个。

    如果您有很多要重置的命令,您可能需要使用批量重置命令。 有关重置功能,请参阅CLI document

    在幕后,重置将使卡住的工作流程忘记最后 7 天的计时器,并从安排计时器之前继续。由于代码已更新为使用 3 天计时器,因此工作流现在将按预期运行。

    解决方案 #2:版本控制 + 批量重置

    Cadence 功能更强大workflow versioning support

    “getVersion 用于安全地对工作流定义执行向后不兼容的更改。不允许在有工作流运行时更新工作流代码,因为它会破坏确定性。解决方案是同时拥有用于重放现有工作流的旧代码以及第一次执行时使用的新代码。

    getVersion 在第一次执行时返回 maxSupported 版本。此版本作为标记事件记录到工作流程历史记录中。即使更改了 maxSupported 版本,录制的版本也会在重播时返回。 DefaultVersion 常量包含之前未版本化的代码版本。”

    我们可以使用此版本控制来更改计时器:

    @Override
    public void sampleWorkflowWithTimer(Input input){
      //...
      //some business logic before the timer
      //use a durable timer waiting for 7 days
      int version = Workflow.getVersion("timerChange", Workflow.DEFAULT_VERSION, 1);
      if (version == Workflow.DEFAULT_VERSION) {
         Workflow.sleep(Duration.ofDays(7));
      } else {
       // Because the workflow has waited for some time,
       // you may want to sleep for 3-timeAlreadyElapsed instead
         Workflow.sleep(Duration.ofDays(3));
      }
      //send an email after the timer fires 
      activities.sendEmailReminder(input);
      //continue with other business 
      //...
    }
    

    请注意,使用这种强大的版本控制而不是 feature flag to the workflow input in SWF 的好处是,不仅新启动的工作流将使用 3 天计时器值,而且已经启动的工作流将使用 3 天,只要它们尚未启动 7 天计时器。

    然后我们可以修复启动 7 天计时器的工作流程。我们将使用 LastDecisionCompleted resetType 重置这些工作流。但是,由于版本控制,reset 变得更容易使用 --

    Cadence 会自动将search attributes 添加到使用版本控制的工作流中。它允许您在历史记录中查找具有特定版本的工作流。在这种情况下,将启动 3 天计时器的工作流将具有搜索属性“CadenceChangeVersion”,其值为“timerChange-1”。因此,要找到卡住的工作流,我们可以使用以下 SQL:

    WorkflowTYpe = “YourWorkflowType” AND CloseTime = missing AND StartTime < “NewCodeDeployTime” AND CadenceChangeVersion != “timerChange-1”
    

    在哪里 WorkflowTYPE = “YourWorkflowType” 仅表示该特定工作流类型, CloseTime = 缺失意味着仅打开工作流, StartTime

    上述 SQL 可以包含从旧代码开始但尚未启动计时器的工作流。如果您想更精确,您还可以将 HistoryLength 包含到 SQL 中。您需要弄清楚卡在 7 天计时器上的历史长度(事件计数)的大致范围是多少。

    获得 SQL 后,使用批量重置命令重置工作流程:

    ./cadence wf reset-batch --query ' WorkflowType= “YourWorkflowType” AND CloseTime = missing AND StartTime < "NewCodeDeployTime" AND CadenceChangeVersion != "timerChange-1" ' --resetType LastDecisionCompleted --reason "some reason"
    

    解决方案 #3:版本控制 + 批量信号

    这种方法与我们为 SWF 描述的方法非常相似。 首先,我们使用版本控制来更改工作流代码,而不是功能标志。

    @Override
    public void sampleWorkflowWithTimer(Input input){
      //...
      //some business logic before the timer
      //use a durable timer waiting for 7 days
      int version = Workflow.getVersion("timerChange", Workflow.DEFAULT_VERSION, 1);
      if (version == Workflow.DEFAULT_VERSION) {
         final boolean received = Workflow.await(Duration.ofDays(7), 
                                      () -> this.operationSignal == true);
         if(received){
            // Because the workflow has waited for some time,
            // you may want to sleep for 3-timeAlreadyElapsed instead
            Workflow.sleep(Duration.ofDays(3));
         }
      } else {
         Workflow.sleep(Duration.ofDays(3));
      }
      //send an email after the timer fires 
      activities.sendEmailReminder(input);
      //continue with other business 
      //...
    }
    
    @Override
      public void operationSignal(final String signal) {
        // you can add more cases to this operationSignal
        this.operationSignal = true;
        LOGGER.info("receive operationSignal: " + signal );
      }
    

    如上所述,此版本控制与 SWF 输入功能标志不同。以旧代码开始的工作流也可以使用 3 天计时器,只要它们尚未开始 7 天。

    值得一提的是,Workflow.Sleep() Workflow.await() 的变化是向后兼容的。那是因为他们都安排了一个具有相同值的计时器——7天。等待信号不需要任何历史事件。

    现在您可以向所有等待的工作流发送 operationSignal。像上面一样,我们可以使用 SQL 来搜索所有这些工作流。然后使用batch signal command 向他们发送信号。

    ./cadence wf batch start --query 'WorkflowType= “YourWorkflowType” AND CloseTime = missing AND StartTime < "NewCodeDeployTime" AND CadenceChangeVersion != "timerChange-1" ' --reason "some reason" --bt signal --input "anything"
    --sig SampleWorkflow::operationSignal 
    

    一旦卡住的工作流接收到一个 operationSignal,它将从 7 天计时器中解除阻塞,并以“已接收”逻辑执行新代码。

    请注意,在 Cadence 中将信号批量发送到工作流比 SWF 更方便。批处理作业保证在 Cadence 中作为系统工作流执行。

    卡住的活动怎么办

    如果您安排并启动了一个超时时间很长但后来不想等待它完成/超时/失败的活动怎么办?由于等待活动,工作流现在卡住了。

    预防

    在进入解决方案之前,需要注意的是,在 Cadence/SWF/Step 函数中使用较长的活动超时而没有正确的心跳被认为是一种反模式。所以你应该从一开始就避免这种情况。如果您希望某个活动运行很长时间,例如> 10分钟,您应该使用设置适当的心跳超时值,并在活动中调用心跳API。在这个答案中查看更多 details Cadence。

    使用心跳不仅对于由于某些 IO/依赖性而卡住的活动很重要。在工作人员部署或活动工作人员失败并且您需要重新启动活动的情况下,这种情况更有可能发生。

    class MyActivitiesImpl implements MyActivities {
    
        @Override
        public String myHeartbeatActivity() {
          ...
          // after any IO/RPC call/some time period
          Activity.heartbeat(heartbeatDetails);
          ...
    

    请注意,对 cadence 服务器的实际心跳调用已通过客户端 SDK 进行了优化。因此,如果您每 1 毫秒调用一次,则不会有任何执行问题。 SDK 在内部将决定在心跳超时时间达到 80% 左右时进行 RPC 调用。

    那么,如果我的活动已经卡住了怎么办

    然而,错误总是会发生,因为我们都是人。

    假设我们的工作流代码如下所示。

    @Override
    public void sampleWorkflowWithLongTimeoutActivity(Input input){
      //...
      //some business logic before the activity
      activities.helloActivity(input);
      //continue with other business 
      //...
    }
    

    现在许多工作流都停留在 helloActivity 上。 由于此活动由于不正确的超时而被卡住,为了缓解此问题,您应该做的第一件事是更新活动选项以使用正确的超时值,或者如果活动是长时间运行的活动,则使用心跳。

    更新活动超时选项是对 Cadence 和 SWF 的向后兼容更改,无需任何版本控制。但是,在 Step 函数的state machines 中指定了活动超时,并且状态机的任何微小更改仅对新的工作流执行生效。因此,对于 Cadence 和 SWF,修复超时选项将适用于尚未启动活动的任何工作流,但对于 Step 函数,它仅适用于从头开始的全新工作流。

    以错误的超时值启动活动的工作流怎么办?

    显然,您在 Step Functions 状态机中无能为力。您可以终止工作流并重新启动它们,但如果工作流有一些值表明您不想重新启动,那将很乏味。那么你唯一应该做的就是在活动方面: 您应该仔细编写活动代码(确保没有死循环或死等待块) 使用适当的指标监控活动执行 您可以在活动中添加逻辑,以便在单独的线程中尽早将故障返回到状态机,如果此时重试无济于事。单独的线程会尝试在工作流端执行正确的超时(但实际上不一样)。

    解决方案 #1

    对于 SWF/Cadence,您可以对工作流代码中的卡住计时器使用类似的解决方案。

    @Override
    public void sampleWorkflowWithLongTimeoutActivity(Input input){
      //...
      //some business logic before the activity
      Promise<void> hello = Async.function(activities::helloActivity, input);
      Workflow.await(()-> hello.isCompleted() || this.operationSignal == true );
      if(this.operationSignal){
       // add your logic to handle the situation that we skip the wrong activity timeout.You may want to schedule the same activity again with correct timeouts
      }else{
      //continue with other business like before to be compatible 
      }
     
    }
    

    这里的技巧是将同步更改为异步。将单个活动从同步更改为异步通常会导致 NDE,但在这种情况下,工作流会立即等待活动和信号。在内部,这将具有相同的工作流程历史记录,因此这是一个向后兼容的更改。

    解决方案 #2

    幸运的是,如果您使用 Cadence,只需使用 LastDecisionCompletedresetType 重置工作流程,它就是救命稻草。

    如果要重置的数量太多,您也可以使用批量重置。

    解决方案 #3

    您还可以使用 CLI 命令来完成或失败活动:

    ./cadence --do <domain> wf activity complete -w <workflowID> -r <runID> --activity_id <activityID> --result <result> --identity <some_identity_string>
    

    ./cadence --do <domain> wf activity fail -w <workflowID> -r <runID> --activity_id <activityID> --reason <reason> --detail <detail> --identity <some_identity_string>
    

    【讨论】:

    • 您也可以随时在 Cadence 中使用 CLI 或 API 完成或失败任何活动。
    • 感谢@MaximFateev 添加了该选项。 (差点忘了自己写的工具,之前我个人没用过lol)。此外,如果您愿意,我也可以将 Temporal 放入此 QA 中。我认为它与 Cadence 没有什么不同,但也许您可以指出是否有。
    猜你喜欢
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    • 2014-02-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多