【问题标题】:Spring state machine nested machines running synchronously同步运行的 Spring 状态机嵌套机器
【发布时间】:2016-04-25 06:40:28
【问题描述】:

我正在使用 Spring 状态机并尝试配置一些嵌套功能。本质上,我试图将两个进程作为嵌套在一个状态中的单个机器运行。我有以下状态和转换代码:

public enum States {
READY, FORK, JOIN, TASKS, TERMINATE,

T1_INIT, C1_INIT, T1_READY, T1_POLL, T1_PROCESS, T1_STORE, T1_DELAY, C1_CONTINUE, T1_TERMINATE,
T2_INIT, C2_INIT, T2_READY, T2_POLL, T2_PROCESS, T2_STORE, T2_DELAY, C2_CONTINUE, T2_TERMINATE }

public enum Events {INITIALIZE, RUN, STOP, FALLBACK, CONTINUE, FIX, PROC_COMPLETE}
@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
        throws Exception {
    states
            .withStates()
            .initial(States.READY)
            .fork(States.FORK)
            .state(States.TASKS)
            .join(States.JOIN)
            .end(States.TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T1_INIT, initT1Action())
            .state(States.T1_READY)
            .state(States.T1_POLL, pollT1Action(), null)
            .state(States.T1_PROCESS, processT1Action(), null)
            .state(States.T1_STORE, storeT1Action(), null)
            .state(States.T1_DELAY)
            .choice(States.C1_CONTINUE)
            .end(States.T1_TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T2_INIT, initT2Action())
            .state(States.T2_READY)
            .state(States.T2_POLL, pollT2Action(), null)
            .state(States.T2_PROCESS, processT2Action(), null)
            .state(States.T2_STORE, storeT2Action(), null)
            .state(States.T2_DELAY)
            .choice(States.C2_CONTINUE)
            .end(States.T2_TERMINATE);

}

@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
        throws Exception {
    transitions
            .withExternal()
            .source(States.READY).target(States.FORK)
            .and()
            .withFork()
            .source(States.FORK).target(States.TASKS)
            .and()
            .withExternal()
            .source(States.T1_INIT).target(States.T1_READY)
            .and()
            .withExternal()
            .source(States.T1_READY).target(States.T1_POLL)
            .and()
            .withExternal()
            .source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
            .and()
            .withExternal()
            .source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
            .and()
            .withExternal()
            .source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
            .and()
            .withExternal()
            .source(States.T1_DELAY).target(States.C1_CONTINUE)
            .and()
            .withChoice()
            .source(States.C1_CONTINUE)
            .first(States.T1_POLL, continueChoiceT1Guard())
            .last(States.T1_TERMINATE)
            .and()
            .withExternal()
            .source(States.T2_INIT).target(States.T2_READY)
            .and()
            .withExternal()
            .source(States.T2_READY).target(States.T2_POLL)
            .and()
            .withExternal()
            .source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
            .and()
            .withExternal()
            .source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
            .and()
            .withExternal()
            .source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
            .and()
            .withExternal()
            .source(States.T2_DELAY).target(States.C2_CONTINUE)
            .and()
            .withChoice()
            .source(States.C2_CONTINUE)
            .first(States.T2_POLL, continueChoiceT2Guard())
            .last(States.T2_TERMINATE)
            .and()
            .withJoin()
            .source(States.TASKS).target(States.JOIN)
            .and()
            .withExternal()
            .source(States.JOIN).target(States.TERMINATE);

}

我期待机器分叉在任务状态中标识的两台机器并将它们作为单独的机器运行。 T1 和 T2 动作设置为在 Task 中的每个状态都有延迟。 T1延迟1秒,T2延迟半秒。

我发现 T1 贯穿其所有状态,甚至在 T2 开始之前就结束了。有什么想法可以让它们通过 fork/join 正常运行吗??

【问题讨论】:

    标签: java spring state-machine spring-statemachine


    【解决方案1】:

    您没有显示完整的配置,从您的代码示例中至少缺少taskExecutor。例如,tasks 执行 fork/join 的示例使用的是 ThreadPoolTaskExecutor。默认执行者是SyncTaskExecutor。文档有一些说明它是如何配置的。

    https://github.com/spring-projects/spring-statemachine/blob/master/spring-statemachine-samples/tasks/src/main/java/demo/tasks/Application.java#L189

    代码示例中的另一件事是您定义了零事件,这意味着您将无法drive 一台机器。您已经定义了从 READYFORK 的转换,没有事件,这意味着它是一个无触发转换。当您启动状态机时,它会更改为初始状态READY,匿名转换会将其带到FORK。此启动顺序发生在启动期间,并且在实际执行程序之外。

    尝试使用事件定义从READYFORK 的转换,然后在它启动后将该事件发送到机器并且并行执行应该可以工作(假设taskExecutor 已正确设置)。

    【讨论】:

      【解决方案2】:

      其实整个FSM配置类的代码如下:

      @Configuration
      @EnableStateMachine
      @WithStateMachine
      public class FSMFactoryConfig extends EnumStateMachineConfigurerAdapter<States, Events> {
      
      @Autowired
      StateMachine<States, Events> stateMachine;
      
      ArrayList<IDataProcessor> dp = new ArrayList<>();
      
      private void createProcessors() {
          dp.add(0, (IDataProcessor) new DataProcessor());
          dp.add(1, (IDataProcessor) new DataProcessor2());
      }
      
      @Override
      public void configure(StateMachineConfigurationConfigurer<States, Events> config)
              throws Exception {
      
          createProcessors();
      
          config
                  .withConfiguration()
                  .autoStartup(false)
                  .taskExecutor(taskExecutor())
                  .taskScheduler(new ConcurrentTaskScheduler())
                  .listener(new StateMachineEventListener());
      }
      
      @Override
      public void configure(StateMachineStateConfigurer<States, Events> states)
              throws Exception {
          states
                  .withStates()
                  .initial(States.READY)
                  .fork(States.FORK)
                  .state(States.TASKS)
                  .join(States.JOIN)
                  .end(States.TERMINATE)
                  .and()
                  .withStates()
                  .parent(States.TASKS)
                  .initial(States.T1_INIT, initT1Action())
                  .state(States.T1_READY)
                  .state(States.T1_POLL, pollT1Action(), null)
                  .state(States.T1_PROCESS, processT1Action(), null)
                  .state(States.T1_STORE, storeT1Action(), null)
                  .state(States.T1_DELAY)
                  .choice(States.C1_CONTINUE)
                  .end(States.T1_TERMINATE)
                  .and()
                  .withStates()
                  .parent(States.TASKS)
                  .initial(States.T2_INIT, initT2Action())
                  .state(States.T2_READY)
                  .state(States.T2_POLL, pollT2Action(), null)
                  .state(States.T2_PROCESS, processT2Action(), null)
                  .state(States.T2_STORE, storeT2Action(), null)
                  .state(States.T2_DELAY)
                  .choice(States.C2_CONTINUE)
                  .end(States.T2_TERMINATE);
      
      }
      
      @Override
      public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
              throws Exception {
          transitions
                  .withExternal()
                  .source(States.READY).target(States.FORK)
                  .and()
                  .withFork()
                  .source(States.FORK).target(States.TASKS)
                  .and()
                  .withExternal()
                  .source(States.T1_INIT).target(States.T1_READY)
                  .and()
                  .withExternal()
                  .source(States.T1_READY).target(States.T1_POLL)
                  .and()
                  .withExternal()
                  .source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
                  .and()
                  .withExternal()
                  .source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
                  .and()
                  .withExternal()
                  .source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
                  .and()
                  .withExternal()
                  .source(States.T1_DELAY).target(States.C1_CONTINUE)
                  .and()
                  .withChoice()
                  .source(States.C1_CONTINUE)
                  .first(States.T1_POLL, continueChoiceT1Guard())
                  .last(States.T1_TERMINATE)
                  .and()
                  .withExternal()
                  .source(States.T2_INIT).target(States.T2_READY)
                  .and()
                  .withExternal()
                  .source(States.T2_READY).target(States.T2_POLL)
                  .and()
                  .withExternal()
                  .source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
                  .and()
                  .withExternal()
                  .source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
                  .and()
                  .withExternal()
                  .source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
                  .and()
                  .withExternal()
                  .source(States.T2_DELAY).target(States.C2_CONTINUE)
                  .and()
                  .withChoice()
                  .source(States.C2_CONTINUE)
                  .first(States.T2_POLL, continueChoiceT2Guard())
                  .last(States.T2_TERMINATE)
                  .and()
                  .withJoin()
                  .source(States.TASKS).target(States.JOIN)
                  .and()
                  .withExternal()
                  .source(States.JOIN).target(States.TERMINATE);
      
      }
      
      
      
      @Bean
      public Guard<States, Events> initChoiceGuard() {
          return new Guard<States, Events>() {
      
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  System.out.println("in initChoiceGuard...");
                  return true;
              }
          };
      }
      
      int i1 = 0;
      int i2 = 0;
      
      @Bean
      public Guard<States, Events> continueChoiceT1Guard() {
      
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  System.out.println("in continueChoiceT1Guard - count: " + i1);
                  return (i1++ < 5);
              }
          };
      }
      
      @Bean
      public Guard<States, Events> continueChoiceT2Guard() {
      
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  System.out.println("in continueChoiceT2Guard - count: " + i2);
                  return (i2++ < 10);
              }
          };
      }
      
      @Bean
      public Guard<States, Events> tasksChoiceGuard() {
          return new Guard<States, Events>() {
      
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  System.out.println("in tasksChoiceGuard");
                  return true;
              }
          };
      }
      
      @Bean
      public Action<States, Events> initT1Action() {
          return new Action<States, Events>() {
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in initAction");
                  ((IDataProcessor) dp.get(0)).init();
              }
          };
      }
      
      @Bean
      public Action<States, Events> initT2Action() {
          return new Action<States, Events>() {
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in initAction");
                  ((IDataProcessor) dp.get(1)).init();
              }
          };
      }
      
      @Bean
      public Action<States, Events> pollT1Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(0)).poll();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> pollT2Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(1)).poll();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> processT1Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(0)).process();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> processT2Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(1)).process();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> storeT1Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(0)).store();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> storeT2Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(1)).store();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> resetT1Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(0)).reset();
      
              }
          };
      }
      
      @Bean
      public Action<States, Events> resetT2Action() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in pollAction");
                  ((IDataProcessor) dp.get(1)).reset();
      
              }
          };
      }
      
      @Bean
      public Guard<States, Events> pollT1Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(0)).pollComplete();
              }
          };
      }
      
      @Bean
      public Guard<States, Events> pollT2Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(1)).pollComplete();
              }
          };
      }
      
      @Bean
      public Guard<States, Events> processT1Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(0)).processComplete();
              }
          };
      }
      
      @Bean
      public Guard<States, Events> processT2Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(1)).processComplete();
              }
          };
      }
      
      @Bean
      public Guard<States, Events> storeT1Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(0)).storeComplete();
              }
          };
      }
      
      @Bean
      public Guard<States, Events> storeT2Guard() {
          return new Guard<States, Events>() {
              @Override
              public boolean evaluate(StateContext<States, Events> context) {
                  return ((IDataProcessor) dp.get(1)).storeComplete();
              }
          };
      }
      
      @Bean
      public Action<States, Events> fixAction() {
          return new Action<States, Events>() {
      
              @Override
              public void execute(StateContext<States, Events> context) {
                  System.out.println("in fixAction");
              }
          };
      }
      
      @Bean
      public StateMachine<States, Events> stateMachine() {
          return stateMachine;
      }
      
      @Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
      public TaskExecutor taskExecutor() {
          ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
          te.setMaxPoolSize(50);
          te.setThreadNamePrefix("LULExecutor-");
          te.setCorePoolSize(25);
          te.initialize();
          return te;
      }
      
      //@StatesOnTransition(target = States.AUTOMATIC)
      public void automaticFix(ExtendedState extendedState) {
      
      }
      

      【讨论】:

      • 您是否从日志中检查了哪些线程正在执行?我对您在配置类中自动装配 StateMachine 并构建与我预期会失败的相同的自动装配实例这一事实一点也不感到困惑。在配置类中使用 @WithStateMachine 也是一个坏主意。
      • 我是 Spring 新手,所以我可能会做一些傻事。感谢您让我了解更好的做法。这些动作是按顺序触发的。我使用 continueGuard 来确定我是否需要再次执行该任务。我希望 task1 和 task2 同时运行,但 task1 运行计数为 5,然后 task2 在 task1 完成后运行计数为 10。鉴于它们是不同的区域,我希望它们同时循环执行任务。这不是我应该期待的吗??
      • 我更新了我的答案。没有发生并行执行的可能原因。
      猜你喜欢
      • 1970-01-01
      • 2023-03-10
      • 1970-01-01
      • 2016-02-20
      • 1970-01-01
      • 2021-01-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多