【问题标题】:Java EE Multiple SessionBeans with TimerService带有 TimerService 的 Java EE 多个 SessionBean
【发布时间】:2020-12-23 18:43:36
【问题描述】:

我有一个带有 TimerService 的无状态会话 Bean。 超时时,它开始使用 JMS 队列。在处理消息时,它需要访问可能暂时不可用的外部资源。 timeout方法循环调用MessageConsumer.receiveNoWait()直到:

  • 没有更多消息要处理:它注册了一个新计时器 = 现在 + 10 分钟。结束。
  • 处理过程中发生错误:它回滚消息并注册一个新计时器:现在 + 30 分钟。结束。

通过这种方式,我可以控制何时重新启动,并且由于 TimerService 回调,我没有休眠线程。

我希望多次出现此会话 bean 以预测队列中的瓶颈:

                    +-----<ejb>-------+
                    | timerService    |
                    |                 |              +---------------------+
                ----| onTimeout() {}  | -----------> | external dependency |
               /    |                 |         /    +---------------------+ 
              /     +-----------------+        /
             /                                /
+---------+ /                                /
|||queue|||K                                /
+---------+ \                              /
             \      +-----<ejb>-------+   /
              \     | timerService    |  /
               \    |                 | /
                ----| onTimeout() {}  | 
                    |                 |
                    +-----------------+

我的会话 bean 看起来像这样(当然是简化了):

@Stateless
public class MyJob {
    
    @Resource
    private TimerService timerService;

    @PostConstruct
    public void init() {
        registerNewTimer(1000L); // -> problem: timerService not accessible
        System.out.println("Initial Timer created");
    }

    private void registerNewTimer(long duration) {
        TimerConfig config = new TimerConfig();
        config.setPersistent(false);
        timerService.createSingleActionTimer(duration, config);
    }

    @Timeout
    @TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
    public void execute() {
        try {
            // instantiate JMS session and consumer
            while ((message = consumer.receiveNoWait()) != null) {
                // business logic with message
                message.acknowledge();
            }
            // queue empty, let's stop for now and register a new timer + 10min
            registerNewTimer(10*60*1000);
        } catch (ResourceException re) {
            // external resource unavailable, let's wait 30min
            registerNewTimer(30*60*1000);
            // last message not acknowledged, so rolled back
        }
    }
}

我不想使用消息驱动 Bean,因为我想控制何时使用消息(请参阅延迟逻辑以防出现错误)。

问题: 错误在@PostConstruct注解init()方法中:此时不允许使用timerService。当我创建会话bean @Singleton 时允许这样做,但随后我失去了并行处理队列的可能性。有谁知道如何解决这个问题?如果 TimerService 不是正确的机制,还有什么可以替代的。是否有 PostConstruct 替代方案允许访问引用的资源并且仅在实例化后调用一次?

提前感谢您提供任何建设性信息。

【问题讨论】:

    标签: java jakarta-ee weblogic12c


    【解决方案1】:

    我自己找到了解决方案,所以我将其发布在此处,以便对其他人有所帮助。

    我添加了一个新的 Singleton,Startup bean,它包含所有实现 MyJobExecutor 接口的 bean 的列表,这要归功于 CDI。在 JEE 环境中,CDI 可以很好地与 EJB 配合使用,因此它会注入会话 bean!警告:CDI 只会注入直接实现 MyJobExecutor 的类,因此如果你有一个实现 MyJobExecutor 的抽象类和一个从这个抽象类扩展的具体 bean,它就不起作用。它必须明确地implements MyJobExecutor。 在启动类的 postconstruct 中,我调用每个 MyJobExecutor bean 在其 TimerService 中注册一个新计时器。这样我就可以为每个会话 bean 创建第一个 singleActionTimer(s)。

    public interface MyJobExecutor {
        // how many timers to register for this bean meaning how many parallel processes.
        public int nrOfParallellInstances();
    }
    
    import javax.annotation.PostConstruct;
    import javax.ejb.EJB;
    import javax.ejb.Singleton;
    import javax.ejb.Startup;
    import javax.enterprise.inject.Any;
    import javax.enterprise.inject.Instance;
    import javax.inject.Inject;
    
    @Singleton
    @Startup
    public class MyJobStartup {
    
        @Inject
        @Any
        private Instance<MyJobExecutor> myJobs;
        
        @PostConstruct
        public void startIt() {
            for (MyJobExecutor ajob : myJobs) {
                for(int i=0; i<ajob.nrOfParallellInstances(); i++)
                    // register every instance with a 1 second delay between each timeout
                    ajob.registerNewTimer(1000L*(i+1));
            }
        }
    }
    

    内部循环将根据ajob.nrOfParallellInstances() 值为同一个bean 注册多个计时器。 但是,当前一个超时仍在运行时,TimerService 不会触发超时。这会按预期阻止并行处理:(

    为了解决这个问题,我调整了 timeout 方法,使其不执行业务逻辑本身,而是启动一个执行业务逻辑的托管线程。这样onTimeout 方法会快速结束,并且计时器服务将启动下一次超时,从而导致多个并行执行(每个都在一个托管线程中):

    import javax.enterprise.concurrent.ManagedThreadFactory;
    import javax.naming.InitialContext;
    
    @Resource(name = "concurrent/AsyncJobThreadFactory")
    private ManagedThreadFactory threadFactory;
    
    @Timeout
    private void onTimeout(Timer timer) {
        LOG.info("Timeout triggered for " + timer.getInfo());
        Thread thread = threadFactory.newThread(new AsyncExecRunnable((String) timer.getInfo()));
        if (thread != null) {
            thread.start();
        } else {
            LOG.warn("No thread available for this job. Retry in 5 minutes.");
            registerNewTimer(1000L * 60 * 5);
        }
    
    }
    private static class AsyncExecRunnable implements Runnable {
    
        private String extra info;
        
        public AsyncExecRunnable(String info) {
            this.info = info;
        }
    
        @Override
        public void run() {
            try {
                LOG.info("Job executor thread started for " + info);
                InitialContext ic=new InitialContext();
                // business logic. With ic you can lookup other EJBs
            } catch (Exception e) {
                LOG.error("Problem executing AsyncJobExecutor:", e);
            }
        }
    }
    

    有了这个设置,我有:

    • 在部署时为所有实现 MyJobExecutor 接口的 Session bean 自动注册第一个计时器。
    • 可以同时注册多个定时器,它们将并行运行。
    • 通过使用 ManagedThreadFactory,线程也可以访问 JNDI 上下文并可以查找 EJB。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-11-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-23
      • 2015-04-29
      相关资源
      最近更新 更多