【发布时间】:2020-05-13 10:01:31
【问题描述】:
我遇到了一个相当复杂的多线程问题:
- 我运行一个将数据推送到 activeMQ 消息队列的服务,完全独立于我正在运行的实际应用程序
- 推送到 ActiveMQ 的数据正在通过 Apache Camel 处理器进行拾取和处理,侦听器可以在其中注册以获得有关新处理器结果的通知。
- 实际应用程序是 Camel 处理器的侦听器。它通过 stream() 接口驱动进程,在每个处理步骤启动新线程。
目标是单独线程中的代码等待 ActiveMq 处理器的结果。我相信我必须使用一些线程锁定或闩锁,但我没有足够的 JAVA 多线程经验来完成这项工作。
处理类代码:
public class MyApplication {
CallingInstanceIF _callingInstance = null;
ActiveMQConnector _activemqConector = null;
Data _dataFromApacheCamelProcessor = null;
public MyApplication(CallingInstanceIF callingInstance) throws Exception {
_callingInstance = callingInstance;
// start apache camel route to ActiveMQ server.
//This pushes data at arrival to method doOnDataArrival() below
try {
_activemqConnector = new ActiveMQConnector(this);
_activemqConnector.startConnections();
} catch (Exception e) {
...
}
}
private ResultDataStructure methodExecutedInSeparateThread(inputData) {
_doSomethingLatch = new CountDownLatch(1);
_callingInstance.setCountDownLatch(_doSomethingLatch);
ResultDataStructure output = new ResultDataStructure();
try {
// do something in calling instance.
// This is done synchronously in other thread
resultOfDoSomething = _callingInstance.doSomething(inputData);
} catch (final Exception ex) {
...
}
// wait for _callingInstance to
// finish --> _callingInstance lifts _doSomethingLatch (working code up to here)
try {
_doSomethingLatch.await();
} catch (final InterruptedException ex) {
...
}
output.add(resultOfDoSomething);
// Here is where I have problems:
//this method shall for result arriving from method doOnDataArrival() below
waitForCamelResult();
output.add(_dataFromApacheCamelProcessor);
return output
}
public void initAlgorithm(final Object scanParameters) throws Exception {
_scannerEngine = Engine.builder(this::methodExecutedInOtherThread, inputData) //
.executor(Executors.newSingleThreadExecutor()) // Executor kicks off "methodExecutedInSeparateThread()" above
.build();
}
}
public void mainProcessingMethod(final BiConsumer<...> callback) throws Exception {
final Thread processingThread = new Thread(() -> {
try {
_scannerEngine.stream() //
.limit(... limit criteria) //
.forEach(processResult -> {
waiting();
if (callback != null) {
callback.accept(evolutionResult, best.max());
}
}
});
} catch (final Exception ex) {
...
}
_callingInstance.doOnAlgorithmFinished();
});
processingThread.start();
_engineStream = processingThread;
}
public void doOnDataArrival(dataFromApacheCamelProcessor) {
_dataBuffer = dataFromApacheCamelProcessor;
shall notify "methodExecutedIn
}
}
【问题讨论】:
标签: java multithreading java-stream