【问题标题】:LinkedBlockingQueue and worker threads - is this code thread safe?LinkedBlockingQueue 和工作线程 - 这个代码线程安全吗?
【发布时间】:2020-02-15 14:41:47
【问题描述】:

我试图了解以下代码是否是线程安全的,它是由另一个开发人员编写的,我继承了他的代码并且不再与我们同在。

我有一个 BaseProvider 类,它实际上是一个消息缓存,由 LinkedBlockingQueue 表示。此类将传入消息存储在队列中。

我有一组读取这个队列的工作线程。因此,LinkedBlockingQueue 是线程安全的。

问题 1.当工作线程调用provider.getNextQueuedItem()时,provider会逐项遍历,加入到一个列表中,返回消息列表。在执行此操作时,如果通过调用 addToQueue 将消息添加到提供程序类会发生什么情况? LinkedBlockingQueue 内部的 takeLock 是否会阻止将新消息添加到队列中,直到所有消息都从队列中取出?

  1. 您会注意到,每个工作线程都可以访问所有提供程序,因此当一个工作线程遍历所有提供程序并调用 getNextQueuedItem() 时,当另一个工作线程也调用所有提供程序并调用 getNextQueuedItem 时会发生什么()?两个工作线程是否会相互交叉?

    公共抽象类 BaseProvider 实现 IProvider {

     private LinkedBlockingQueue<CoreMessage> internalQueue = new LinkedBlockingQueue<CoreMessage>();
    
    @Override
    public synchronized List<CoreMessage> getNextQueuedItem() {
        List<CoreMessage> arrMessages = new ArrayList<CoreMessage>();                    
        if (internalQueue.size() > 0) {
            Logger.debug("Queue has entries");    
            CoreMessage msg = null;
            try {
                msg = internalQueue.take();
            } catch (InterruptedException e) {
                Logger.warn("Interruption");
                e.printStackTrace();
            }
            if (msg != null) {
                arrMessages.add(msg);
            }
        }
        return arrMessages;
    }
    
    protected synchronized void addToQueue(CoreMessage message) {
        try {
            internalQueue.put(message);
        } catch (InterruptedException e) {
            Logger.error("Exception adding message to queue " + message);
        }
    }
    

    }

// 有一组工作线程读取这些队列

public class Worker implements Runnable 
 @Override
 public void run() {
    Logger.info("Worker - Running Thread : " + Thread.currentThread().getName());

    while (!stopRequested) {
        boolean processedMessage = false;
        for (IProvider provider : providers) {
            List<CoreMessage> messages = provider.getNextQueuedItem();
            if (messages == null || messages.size() != 0) {
                processedMessage = true;
                for (CoreMessage message : messages) {
                    final Message msg = createEndurMessage(provider, message);
                    processMessage(msg);
                    message.commit();
                }
            }
        }
        if (!(processedMessage || stopRequested)) {
            // this is to stop the thread from spinning when there are no messages
            try {
                Thread.sleep(WAIT_INTERVAL);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

}

【问题讨论】:

  • 出于好奇,为什么getNextQueuedItem() 返回一条消息的列表?为什么它不简单地返回消息?
  • Re, if (messages == null || messages.size() != 0) ... 不清楚作者在这里试图说什么,但这没关系,因为messages 永远不可能是nullgetNextQueueItem() 方法总是 返回一个List,它可能包含零项或一项。此外,可能没有理由明确测试messages 列表的大小,因为for (...) 循环无论如何都会对其进行测试。
  • Re,“当工作线程调用provider.getNextQueuedItem() 时,提供者会逐项检查......”这似乎不是真的。在您的示例中,getNextQueuedItem() 的实现除了从 internalQueue 中获取 一个 项之外什么都不做,然后它返回包裹在 new ArrayList 中的单个项。

标签: java multithreading thread-safety


【解决方案1】:

如果通过调用 addToQueue 将消息添加到提供程序类会发生什么?

getNextQueuedItem()addToQueue(...) 都是 synchronized 方法。如果只有这两种方法可以访问private ... internalQueue,那么多个线程就不可能同时访问internalQueue

当一个工作线程通过所有提供者并调用 getNextQueuedItem() 时,当另一个工作线程也通过所有提供者调用并调用 getNextQueuedItem() 时会发生什么?

您是在询问多个工作人员访问 相同 提供程序的情况吗?这不可能发生,因为getNextQueuedItem()synchronized 方法。

-- 或者--

您是在询问访问不同提供程序的不同工作人员吗?这应该无关紧要(至少,就BaseProvider 类而言),因为似乎没有任何方式可以将不同的对象相互连接。

【讨论】:

  • 感谢您的回复@solmon-slow。我还有一个问题 - message.commit();在处理消息之后被调用。现在,如果两个工作线程已经从提供者那里获取了消息,并且第一个工作线程正在花费时间来处理第一条消息,同时第二个工作线程已经完成了对第二条消息的处理并为该消息发送了 ack。由于第一条消息仍未确认,而第二条消息已确认,则第二条消息仍在队列中,直到第一条消息被确认。
  • @serah, Re, “...在第一条消息被确认之前,第二条消息是否会保留在队列中。”你问的是什么队列?如果您在代码示例中谈论 internalQueue,则 msg=internalQueue.take() 会从队列中删除一个项目。故事结局。如果您要询问其他队列,那么您的示例中没有显示,您可能应该为它开始一个新问题。
  • 谢谢,我已经提出了一个,只是在这里发表评论后意识到这需要一个新线程 - stackoverflow.com/questions/58489314/…
猜你喜欢
  • 1970-01-01
  • 2023-04-05
  • 1970-01-01
  • 1970-01-01
  • 2013-02-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多