【问题标题】:Synchronization problem in JAVA multithreading environment together with stream interfaceJAVA多线程环境与流接口的同步问题
【发布时间】:2020-05-13 10:01:31
【问题描述】:

我遇到了一个相当复杂的多线程问题:

  • 我运行一个将数据推送到 activeMQ 消息队列的服务,完全独立于我正在运行的实际应用程序
  • 推送到 ActiveMQ 的数据正在通过 Apache Camel 处理器进行拾取和处理,侦听器可以在其中注册以获得有关新处理器结果的通知。
  • 实际应用程序是 Camel 处理器的侦听器。它通过 stream() 接口驱动进程,在每个处理步骤启动新线程。

目标是单独线程中的代码等待 ActiveMq 处理器的结果。我相信我必须使用一些线程锁定或闩锁,但我没有足够的 JAVA 多线程经验来完成这项工作。

处理类代码:

public class MyApplication {

    CallingInstanceIF _callingInstance = null;
    ActiveMQConnector _activemqConector = null;
    Data _dataFromApacheCamelProcessor = null;

    public MyApplication(CallingInstanceIF callingInstance) throws Exception {

        _callingInstance = callingInstance;

        // start apache camel route to ActiveMQ server. 
        //This pushes data at arrival to method doOnDataArrival() below
        try {
            _activemqConnector = new ActiveMQConnector(this);
            _activemqConnector.startConnections();
        } catch (Exception e) {
            ...
        }
    }

    private ResultDataStructure methodExecutedInSeparateThread(inputData) {

            _doSomethingLatch = new CountDownLatch(1);
            _callingInstance.setCountDownLatch(_doSomethingLatch);
            ResultDataStructure output = new ResultDataStructure();

            try {
               // do something in calling instance. 
               // This is done synchronously in other thread
                resultOfDoSomething = _callingInstance.doSomething(inputData);
            } catch (final Exception ex) {
                ...
            }

            // wait for _callingInstance to 
            // finish --> _callingInstance lifts _doSomethingLatch (working code up to here)
            try {
                _doSomethingLatch.await();
            } catch (final InterruptedException ex) {
                ...
            }

            output.add(resultOfDoSomething);

            // Here is where I have problems: 
            //this method shall for result arriving from method doOnDataArrival() below

            waitForCamelResult();
            output.add(_dataFromApacheCamelProcessor);


        return output

    }

    public void initAlgorithm(final Object scanParameters) throws Exception {

                _scannerEngine = Engine.builder(this::methodExecutedInOtherThread, inputData) //
                        .executor(Executors.newSingleThreadExecutor()) // Executor kicks off "methodExecutedInSeparateThread()" above
                        .build();

        }
    }

    public void mainProcessingMethod(final BiConsumer<...> callback) throws Exception {

        final Thread processingThread = new Thread(() -> {

            try {
                _scannerEngine.stream() //
                        .limit(... limit criteria) //
                        .forEach(processResult -> {
                            waiting();
                            if (callback != null) {
                                    callback.accept(evolutionResult, best.max());
                                }
                            }
                        });

            } catch (final Exception ex) {
                ...
            }

            _callingInstance.doOnAlgorithmFinished();
        });

        processingThread.start();
        _engineStream = processingThread;
    }    

    public void doOnDataArrival(dataFromApacheCamelProcessor) {

        _dataBuffer = dataFromApacheCamelProcessor;

        shall notify "methodExecutedIn
    }
}

【问题讨论】:

    标签: java multithreading java-stream


    【解决方案1】:

    我无法参考上面的代码,但据我了解,有两个 java 构造可能会对您有所帮助。

    CompletableFuture (Tutorial) 如果您需要发送线程并等待单个响应,可以为您提供帮助。使用supplyAsyncget

    或者,如果您需要使用一个实例传送多个“镜头”,您可以使用SynchronousQueue (Tutorial)(阻塞队列。)。基本上是调用puttake

    【讨论】:

    • 嗨 Ben,我设法使用静态 CoundownLatch 实现了我需要的功能,它在方法 ExecutedInSeparateThread() 中实例化并在 doOnDataArrival() 中倒计时。这行得通,也许它不是很优雅......与 Futures 一起玩也很好,但这更难使用。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-02-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多