【发布时间】:2020-02-15 14:41:47
【问题描述】:
我试图了解以下代码是否是线程安全的,它是由另一个开发人员编写的,我继承了他的代码并且不再与我们同在。
我有一个 BaseProvider 类,它实际上是一个消息缓存,由 LinkedBlockingQueue 表示。此类将传入消息存储在队列中。
我有一组读取这个队列的工作线程。因此,LinkedBlockingQueue 是线程安全的。
问题 1.当工作线程调用provider.getNextQueuedItem()时,provider会逐项遍历,加入到一个列表中,返回消息列表。在执行此操作时,如果通过调用 addToQueue 将消息添加到提供程序类会发生什么情况? LinkedBlockingQueue 内部的 takeLock 是否会阻止将新消息添加到队列中,直到所有消息都从队列中取出?
-
您会注意到,每个工作线程都可以访问所有提供程序,因此当一个工作线程遍历所有提供程序并调用 getNextQueuedItem() 时,当另一个工作线程也调用所有提供程序并调用 getNextQueuedItem 时会发生什么()?两个工作线程是否会相互交叉?
公共抽象类 BaseProvider 实现 IProvider {
private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>(); @Override public synchronized List<CoreMessage> getNextQueuedItem() { List<CoreMessage> arrMessages = new ArrayList<CoreMessage>(); if (internalQueue.size() > 0) { Logger.debug("Queue has entries"); CoreMessage msg = null; try { msg = internalQueue.take(); } catch (InterruptedException e) { Logger.warn("Interruption"); e.printStackTrace(); } if (msg != null) { arrMessages.add(msg); } } return arrMessages; } protected synchronized void addToQueue(CoreMessage message) { try { internalQueue.put(message); } catch (InterruptedException e) { Logger.error("Exception adding message to queue " + message); } }}
// 有一组工作线程读取这些队列
public class Worker implements Runnable
@Override
public void run() {
Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());
while (!stopRequested) {
boolean processedMessage = false;
for (IProvider provider : providers) {
List<CoreMessage> messages = provider.getNextQueuedItem();
if (messages == null || messages.size() != 0) {
processedMessage = true;
for (CoreMessage message : messages) {
final Message msg = createEndurMessage(provider, message);
processMessage(msg);
message.commit();
}
}
}
if (!(processedMessage || stopRequested)) {
// this is to stop the thread from spinning when there are no messages
try {
Thread.sleep(WAIT_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
【问题讨论】:
-
出于好奇,为什么
getNextQueuedItem()返回一条消息的列表?为什么它不简单地返回消息? -
Re,
if (messages == null || messages.size() != 0) ...不清楚作者在这里试图说什么,但这没关系,因为messages永远不可能是null:getNextQueueItem()方法总是 返回一个List,它可能包含零项或一项。此外,可能没有理由明确测试messages列表的大小,因为for (...)循环无论如何都会对其进行测试。 -
Re,“当工作线程调用
provider.getNextQueuedItem()时,提供者会逐项检查......”这似乎不是真的。在您的示例中,getNextQueuedItem()的实现除了从internalQueue中获取 一个 项之外什么都不做,然后它返回包裹在new ArrayList中的单个项。
标签: java multithreading thread-safety