【问题标题】:How to add second activity in Amazon SWF hello_sample example如何在 Amazon SWF hello_sample 示例中添加第二个活动
【发布时间】:2017-04-16 11:39:25
【问题描述】:

我已成功实现名为 hello_sample 的简单 Java Amazon SWF 示例。我创建了 ActivityWorker 可执行文件,它轮询 SWF 以处理要处理的活动任务。我创建了 WorkflowWorker 可执行文件,它轮询 SWF 以获取决策任务,并且我有一个 WorkflowStarter 可执行文件,它启动了工作流的执行。它像宣传的那样工作。我不明白的是如何配置和添加第二个活动以在第一个活动之后运行?
工作流工作者:

public class WorkflowWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static void main(String[] args) {
    PollForDecisionTaskRequest task_request =
        new PollForDecisionTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST));

    while (true) {
        System.out.println(
                "WorkflowWorker is polling for a decision task from the tasklist '" +
                Constants.TASKLIST + "' in the domain '" +
                Constants.DOMAIN + "'.");

        DecisionTask task = swf.pollForDecisionTask(task_request);

        String taskToken = task.getTaskToken();
        if (taskToken != null) {
            try {
                executeDecisionTask(taskToken, task.getEvents());
            }
            catch (Throwable th) {
                th.printStackTrace();
            }
        }
    }
}

private static void executeDecisionTask(String taskToken, List<HistoryEvent> events) throws Throwable {
    List<Decision> decisions = new ArrayList<Decision>();
    String workflow_input = null;
    int scheduled_activities = 0;
    int open_activities = 0;
    boolean activity_completed = false;
    String result = null;

    System.out.println("WorkflowWorker is executing the decision task for the history events: [");
    for (HistoryEvent event : events) {
        System.out.println("  " + event);
        switch(event.getEventType()) {
            case "WorkflowExecutionStarted":
                workflow_input = event.getWorkflowExecutionStartedEventAttributes().getInput();
                break;
            case "ActivityTaskScheduled":
                scheduled_activities++;
                break;
            case "ScheduleActivityTaskFailed":
                scheduled_activities--;
                break;
            case "ActivityTaskStarted":
                scheduled_activities--;
                open_activities++;
                break;
            case "ActivityTaskCompleted":
                open_activities--;
                activity_completed = true;
                result = event.getActivityTaskCompletedEventAttributes().getResult();
                break;
            case "ActivityTaskFailed":
                open_activities--;
                break;
            case "ActivityTaskTimedOut":
                open_activities--;
                break;
        }
    }
    System.out.println("]");

    if (activity_completed) {
        decisions.add(
            new Decision()
                .withDecisionType(DecisionType.CompleteWorkflowExecution)
                .withCompleteWorkflowExecutionDecisionAttributes(
                    new CompleteWorkflowExecutionDecisionAttributes()
                        .withResult(result)));
    }
    else {
        if (open_activities == 0 && scheduled_activities == 0) {
            ScheduleActivityTaskDecisionAttributes attrs =
                new ScheduleActivityTaskDecisionAttributes()
                    .withActivityType(new ActivityType()
                        .withName(Constants.ACTIVITY)
                        .withVersion(Constants.ACTIVITY_VERSION))
                    .withActivityId(UUID.randomUUID().toString())
                    .withInput(workflow_input);

            decisions.add(
                    new Decision()
                        .withDecisionType(DecisionType.ScheduleActivityTask)
                        .withScheduleActivityTaskDecisionAttributes(attrs));
        }
        else {
            // an instance of HelloActivity is already scheduled or running. Do nothing, another
            // task will be scheduled once the activity completes, fails or times out
        }
    }

    System.out.println("WorkflowWorker is exiting the decision task with the decisions " + decisions);
    swf.respondDecisionTaskCompleted(
        new RespondDecisionTaskCompletedRequest()
            .withTaskToken(taskToken)
            .withDecisions(decisions));
}

}

ActivityWorker:

public class ActivityWorker {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
private static CountDownLatch waitForTermination = new CountDownLatch(1);
private static volatile boolean terminate = false;

private static String executeActivityTask(String g_species) throws Throwable {
    String s = "   ********   Hello, " + g_species + "!";
    System.out.println(s);

    String cwd = Paths.get(".").toAbsolutePath().normalize().toString();
    String filename = "g_species.txt";
    Path filePath = Paths.get(cwd, filename);
    String filePathName = filePath.toString();

    BufferedWriter output = null;
    try {
        File file = new File (filePathName);
        output = new BufferedWriter(new FileWriter(file));
        output.write(g_species);
    } 
    catch (IOException e) {
        e.printStackTrace();
    } 
    finally {
      if (output != null) {
        output.close();
      }
    }

    return g_species;
}

public static void main(String[] args) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                terminate = true;
                System.out.println("ActivityWorker is waiting for the current poll request to return before shutting down.");
                waitForTermination.await(60, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                // ignore
                System.out.println(e.getMessage());
            }
        }
    });
    try {
        pollAndExecute();
    }
    finally {
        waitForTermination.countDown();
    }
}

public static void pollAndExecute() {
    while (!terminate) {
        System.out.println("ActivityWorker is polling for an activity task from the tasklist '"
                + Constants.TASKLIST + "' in the domain '" + Constants.DOMAIN + "'.");

        ActivityTask task = swf.pollForActivityTask(new PollForActivityTaskRequest()
            .withDomain(Constants.DOMAIN)
            .withTaskList(new TaskList().withName(Constants.TASKLIST)));

        String taskToken = task.getTaskToken();

        if (taskToken != null) {
            String result = null;
            Throwable error = null;

            try {
                System.out.println("ActivityWorker is executing the activity task with input '" + task.getInput() + "'.");
                result = executeActivityTask(task.getInput());
            }
            catch (Throwable th) {
                error = th;
            }

            if (error == null) {
                System.out.println("The activity task succeeded with result '" + result + "'.");
                swf.respondActivityTaskCompleted(
                    new RespondActivityTaskCompletedRequest()
                        .withTaskToken(taskToken)
                        .withResult(result));
            }
            else {
                System.out.println("The activity task failed with the error '"
                        + error.getClass().getSimpleName() + "'.");
                swf.respondActivityTaskFailed(
                    new RespondActivityTaskFailedRequest()
                        .withTaskToken(taskToken)
                        .withReason(error.getClass().getSimpleName())
                        .withDetails(error.getMessage()));
            }
        }
    }
}

}

WorkflowStarter 启动一切:

public class WorkflowStarter {
private static final AmazonSimpleWorkflow swf = AmazonSimpleWorkflowClientBuilder.defaultClient();
public static final String WORKFLOW_EXECUTION = "HelloWorldWorkflowExecution";

public static void main(String[] args) {

    String workflow_input = "Amazon SWF";
    if (args.length > 0) {
        workflow_input = args[0];
    }

    System.out.println("Starting the workflow execution '" + WORKFLOW_EXECUTION +
            "' with input '" + workflow_input + "'.");

    WorkflowType wf_type = new WorkflowType()
        .withName(Constants.WORKFLOW)
        .withVersion(Constants.WORKFLOW_VERSION);

    Run run = swf.startWorkflowExecution(new StartWorkflowExecutionRequest()
        .withDomain(Constants.DOMAIN)
        .withWorkflowType(wf_type)
        .withWorkflowId(WORKFLOW_EXECUTION)
        .withInput(workflow_input)
        .withExecutionStartToCloseTimeout("90"));

    System.out.println("Workflow execution started with the run id '" +
            run.getRunId() + "'.");
}

}

【问题讨论】:

    标签: java amazon-web-services amazon-swf


    【解决方案1】:

    我建议不要重新发明轮子并使用亚马逊官方支持的AWS Flow Framework for Java。它已经实现了所有底层细节,并允许您直接专注于工作流的业务逻辑。

    这是一个使用三个活动的示例工作流(取自developer guide)。

    活动界面:

    import com.amazonaws.services.simpleworkflow.flow.annotations.Activities;
    import com.amazonaws.services.simpleworkflow.flow.annotations.ActivityRegistrationOptions;
    
    @ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 300,
                                 defaultTaskStartToCloseTimeoutSeconds = 10)
    @Activities(version="1.0")
    
    public interface GreeterActivities {
       public String getName();
       public String getGreeting(String name);
       public void say(String what);
    }
    

    活动实施:

    public class GreeterActivitiesImpl implements GreeterActivities {
       @Override
       public String getName() {
          return "World";
       }
       @Override
       public String getGreeting(String name) {
          return "Hello " + name;
       }
       @Override
       public void say(String what) {
          System.out.println(what);
       }
    }
    

    工作流界面:

    import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
    import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
    import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;
    
    @Workflow
    @WorkflowRegistrationOptions(defaultExecutionStartToCloseTimeoutSeconds = 3600)
    public interface GreeterWorkflow {
       @Execute(version = "1.0")
       public void greet();
    }
    

    工作流程实现:

    import com.amazonaws.services.simpleworkflow.flow.core.Promise;
    
    public class GreeterWorkflowImpl implements GreeterWorkflow {
       private GreeterActivitiesClient operations = new GreeterActivitiesClientImpl();
    
       public void greet() {
         Promise<String> name = operations.getName();
         Promise<String> greeting = operations.getGreeting(name);
         operations.say(greeting);
       }
    }
    

    同时托管它们的工作人员。显然,它可以分解为单独的活动和工作流工作者:

    import com.amazonaws.ClientConfiguration;
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.BasicAWSCredentials;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
    import com.amazonaws.services.simpleworkflow.flow.ActivityWorker;
    import com.amazonaws.services.simpleworkflow.flow.WorkflowWorker;
    
    public class GreeterWorker  {
       public static void main(String[] args) throws Exception {
         ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
    
         String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
         String swfSecretKey = System.getenv("AWS_SECRET_KEY");
         AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
    
         AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
         service.setEndpoint("https://swf.us-east-1.amazonaws.com");
    
         String domain = "helloWorldWalkthrough";
         String taskListToPoll = "HelloWorldList";
    
         ActivityWorker aw = new ActivityWorker(service, domain, taskListToPoll);
         aw.addActivitiesImplementation(new GreeterActivitiesImpl());
         aw.start();
    
         WorkflowWorker wfw = new WorkflowWorker(service, domain, taskListToPoll);
         wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
         wfw.start();
       }
    }
    

    工作流启动器:

    import com.amazonaws.ClientConfiguration;
    import com.amazonaws.auth.AWSCredentials;
    import com.amazonaws.auth.BasicAWSCredentials;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
    import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflowClient;
    
    public class GreeterMain {
    
       public static void main(String[] args) throws Exception {
         ClientConfiguration config = new ClientConfiguration().withSocketTimeout(70*1000);
    
         String swfAccessId = System.getenv("AWS_ACCESS_KEY_ID");
         String swfSecretKey = System.getenv("AWS_SECRET_KEY");
         AWSCredentials awsCredentials = new BasicAWSCredentials(swfAccessId, swfSecretKey);
    
         AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(awsCredentials, config);
         service.setEndpoint("https://swf.us-east-1.amazonaws.com");
    
         String domain = "helloWorldWalkthrough";
    
         GreeterWorkflowClientExternalFactory factory = new GreeterWorkflowClientExternalFactoryImpl(service, domain);
         GreeterWorkflowClientExternal greeter = factory.getClient("someID");
         greeter.greet();
       }
    }
    

    【讨论】:

    • 我使用的示例代码可能很旧,它既没有使用 Promise 也没有使用 @Actvities
    • 然后在您的问题中添加更多信息。您使用的是什么语言和框架?你的代码怎么样?
    • 我正在使用 Java 和 com.amazonaws.services.simpleworkflow
    • 您的工作流程和活动实施情况如何?这个包确实包含 Promise 和 @Activities 注释。
    • 有没有办法附上文件或者我可以私下联系你?无论如何谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-01-27
    • 1970-01-01
    • 1970-01-01
    • 2016-09-10
    • 2018-05-20
    • 2016-10-28
    • 2023-04-04
    相关资源
    最近更新 更多