【问题标题】:AWS Simple workflow notification techniqueAWS 简单工作流通知技术
【发布时间】:2015-08-30 10:42:13
【问题描述】:

我是 AWS SWF 的新手,我的任务是仅在第一次尝试活动失败时发出通知(通过电子邮件)。我使用Settable<Boolean> 作为我的标志,但该值不可靠,因为工作流正在异步运行。这是我的代码:

 final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
 final Settable<Boolean> notifyException = new Settable<>();

    new TryCatch() {
        @Override
        protected void doTry() throws Throwable {
            asyncExecutor.execute(() -> new TryCatch() {
                @Override
                protected void doTry() throws Throwable {
                    Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
                }

                @Override
                protected void doCatch(Throwable e) throws Throwable {
                    if (!notifyException.isReady()) {
                        // PERFORM notification service here!!
                        notifyException.set(true);
                    } else {
                        // DO NOTHING. Notification is already sent the first time :)
                    }

                    throw e;
                }
            });
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            System.out.println("======");
            System.out.println("CATCH ALL!! " + e.getMessage());
            System.out.println("======");
        }
    };

notifyException 的值始终在变化,即使我在 if statement 内将其值明确设置为 true。我的代码的结果是它将执行超过 1 个notification service

====更新===

 final AsyncExecutor asyncExecutor = new AsyncRetryingExecutor(retryPolicy, workflowClock);
 final Settable<Boolean> notifyException = new Settable<>();

    new TryCatch() {
        @Override
        protected void doTry() throws Throwable {
            asyncExecutor.execute(() -> new TryCatch() {
                @Override
                protected void doTry() throws Throwable {
                    Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
                }

                @Override
                protected void doCatch(Throwable e) throws Throwable {
                    if (!notifyException.isReady()) {
                        activityOneClient.notify();
                        notifyException.set(true);
                    }
                    throw e;
                }
            });
        }

        @Override
        protected void doCatch(Throwable e) throws Throwable {
            System.out.println("======");
            System.out.println("CATCH ALL!! " + e.getMessage());
            System.out.println("======");
        }
    };

当我重新抛出异常时,activityOneClient.notify() 只会在asyncExecutor 的最后一次重试时执行,所以如果我不重新抛出,activityOneClient.notify() 将自动执行。我只需要在第一次发生异常时通知。

【问题讨论】:

    标签: amazon-web-services spring-boot amazon-swf


    【解决方案1】:

    代码看起来没问题(还没有运行)。

    您的问题出在其他地方。流框架的工作方式是,每次有工作要做时,工作流工作者或活动工作者拿起任务,运行它并报告结果。报告结果意味着执行的结果被捕获在工作流历史中。

    每当历史记录发生变化时,决策者(工作流程)工作人员就会运行并决定接下来应该发生什么。在幕后,决策者只是重播所有历史并从一开始就做出所有决定。现在,如果决策与历史上的内容相匹配,决策者会继续进行,直到遇到新事物。

    简单的例子。假设您有一个包含 3 个步骤的工作流程:

    • 活动1
    • activity2 - 取决于 activity1 的结果
    • activity3 - 取决于 activity2 的结果

    [请记住,多个活动可以同时运行,如果一个活动的输出和另一个活动的输入之间没有联系,流程框架将继续前进并并行安排多个活动。保持简单,以便您了解想法]

    在 T0,当工作流启动时没有历史记录,决策程序运行并计划 activity1。

    在 T1,activity1 工作人员接手任务,执行它并报告结果。

    在 T2,由于 activity1 更新历史记录,决策者被安排并运行 + 它读取历史记录。它看到activity1应该运行,它在历史记录中看到它,它看到它完成,它安排activity2

    在 T3,activity2 工作人员接手任务,执行它并报告结果。

    在 T4,由于 activity2 更新历史,决策者被安排并运行 + 它读取历史。它看到activity1应该运行,它在历史中看到它,它看到它完成了。它现在看到activity2应该运行,它在历史记录中看到它,它看到它已经完成并继续到activity3。它安排活动3。

    等等。

    您的问题是,决定流程的决定程序中的代码每次决定程序运行时都运行(每个决定)。因此,当决策者到达该活动时,它将看到它已运行并已抛出异常,并且它将通过设置标志的代码+发送电子邮件。

    当指数重试开始时,标志将被设置,但发送电子邮件的代码将在每个决策上运行(决策者基本上是无状态的,状态是基于历史构建的)。

    在这种特殊情况下,您可以做的是移动在其自己的活动中发送电子邮件的部分。这样决策者将在历史记录中看到它并快进,而不是每次都运行它。

    【讨论】:

    • 好的 :) 我明白了。我忘记了决策者是无国籍的(谢谢)。现在,我的问题是如何知道抛出的异常是否是该特定工作流程中发生的第一个异常。我的客户只需要在 activityOne 发生异常时通知一次,如果 activityOne 再次抛出错误,通知服务不应该运行。
    • 我个人会做的是捕获活动中的异常并通过返回码表明活动不成功。
    • 好吧,activityOne 也是无状态的。如何存储或说下一次发生的异常不需要通知服务。
    • 当决策者重放活动时,它不会多次执行,因为它从历史中知道它已经执行了。因此,您的代码应该无需修改即可工作。只需在此处更改“// PERFORM 通知服务!!”使用通知活动调用进行评论。
    • 嗨米尔恰。感谢您的回答。我真的相信这是可行的,但是有时即使在我重建应用程序之后 doCatch() 中的活动也没有被执行(我真的确定它应该被执行)。有什么想法吗?
    【解决方案2】:

    为了只通知第一个抛出的异常,我所做的是:

    @Override
        protected void doTry() throws Throwable {
            asyncExecutor.execute(() -> new TryCatchFinally() {
    
                Throwable throwable = null;
                @Override
                protected void doTry() throws Throwable {
                    Promise<ActivityOne> activityOne = activityOneClient.performAction(activityOneRequest);
                }
    
                @Override
                protected void doCatch(Throwable e) throws Throwable {
                    if (!notifyException.isReady()) {
                        activityOneClient.notify();
                        notifyException.set(true);
                    }
                    throwable = e;
                }
    
                @Override
                protected void doFinally() throws Throwable {
                    if (throwable != null) {
                        throw throwable;
                    }
                }
            });
        }
    

    【讨论】:

      猜你喜欢
      • 2012-03-13
      • 2011-11-19
      • 2015-05-01
      • 1970-01-01
      • 2020-01-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多