【问题标题】:Different results in run and deburg mode with Multithread producer-consumer使用多线程生产者-消费者在运行和调试模式下的不同结果
【发布时间】:2015-07-12 19:20:10
【问题描述】:

在一个 JPA 项目中,我有两个线程:生产者和消费者。

生产者应该从另一个类中获取twitter查询的搜索结果并将其放入LinkedBlockingQueue,线程消费者应该消费结果并使用另一个类将它们持久化到MYSQL。

这里我先给大家介绍一下主类,生产者和消费者:

类:

public class RunQuery {

    public final static EntityManagerFactory   
  emf=Persistence.createEntityManagerFactory("mypu");

    public static void main(String[] args) throws Exception

        {
            org.apache.log4j.BasicConfigurator.configure(new 
     NullAppender());

            BlockingQueue<Pair<String, QueryResult>> tweetBuffer=new 
     LinkedBlockingQueue<Pair<String,QueryResult>>();

            // Creating Producer and Consumer Thread
            Runnable producerThread = new 
         TwitterStreamProducer("producer",tweetBuffer,args);

            Runnable consumerThread = new 
       TwitterStreamConsumer("consumer",tweetBuffer);

           Thread producer=new Thread(producerThread);
           Thread consumer=new Thread(consumerThread);

           producer.start();  

           Thread.sleep(100);

            consumer.start();               
        }

制片人话题:

   public class TwitterStreamProducer implements Runnable{

    private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
    private final ResultsController resultsController;
    private String[] Keywords;
    private String name;

    public TwitterStreamProducer(String name,
            BlockingQueue<Pair<String, QueryResult>> tweetBuffer, 
           String[] 

        keywords) {

        this.tweetBuffer=tweetBuffer;
        this.resultsController=new ResultsController();
        this.Keywords=keywords;
        this.name=name;

    }

    public void run() {

        for(String key:Keywords)
        {
        boolean interrupted = false;
        try {
            this.tweetBuffer.put( new Pair<String,QueryResult>
      (key,resultsController.search(key)) );
            Logger.getLogger("TwitterApp").info("one result added to the
       queue, current size:"+tweetBuffer.size());
        }

        catch (InterruptedException e) {
              interrupted = true;
        //  Logger.getLogger("Producer").info( this.name+ " Interrupted
            "+e.toString() );
            }
        catch(Exception e)
        {
            interrupted = true;
        }
        finally {
            if (interrupted)
                Thread.currentThread().interrupt();
                }

        }


    }

消费者话题:

public class TwitterStreamConsumer  implements Runnable{

    private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
    private final ResultsController resultsController;
    private String name=null;

    public TwitterStreamConsumer(String name,
            BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {

        this.name=name;
        this.tweetBuffer=tweetBuffer;
        this.resultsController=new ResultsController();
    }

    public void run() {


        while(! tweetBuffer.isEmpty())
        {
        try {
            Logger.getLogger("TwitterApp").info(this.name+" has consummed
     queue, current size of queue:"+tweetBuffer.size());
            resultsController.parse(this.tweetBuffer.take());
        } catch (InterruptedException e) {
        Logger.getLogger("Consumer").info(this.name+ " interrupted   
    "+e.getMessage());
        }

        }

    }

}

如果需要其他信息,我会在这里提供。

这是Logger结果,当我运行它时,生产者总是产生8个结果,之后什么都没有发生,应用程序不会被中断或不会产生任何错误:

Jul 12, 2015 12:07:22 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:1
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:2
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:3
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:4
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:5
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:6
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:7
Jul 12, 2015 12:07:26 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:8

当我在消费者线程中使用 run 方法开头的断点进行调试时,它可以正常工作。

提前感谢您的建议。

【问题讨论】:

    标签: java mysql multithreading producer-consumer


    【解决方案1】:
    while(! tweetBuffer.isEmpty())
    

    如果队列为空,这将跳过循环。因此,如果生产者在将某些内容存储到队列中之前需要一些时间,那么您的消费者不会做任何事情。摆脱那张支票。只需使用一个循环,当线程中断或队列包含假结束标记元素时停止,并重复从队列中取出下一个元素。

    【讨论】:

      【解决方案2】:

      您的消费者在第一次运行时结束。第一次执行时,缓冲区应该是空的,然后它不会进入while循环并结束run方法和线程。

      你必须循环等待缓冲区,有时,如果缓冲区是空的,这不应该结束消费者。它只需要等待下一条消息。 你可以用这样的代码来做到这一点:

      public class TwitterStreamConsumer  implements Runnable{
      
          private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
          private final ResultsController resultsController;
          private String name=null;
          private boolean stopped;
      
          public TwitterStreamConsumer(String name, BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {
      
              this.name=name;
              this.tweetBuffer=tweetBuffer;
              this.resultsController=new ResultsController();
          }
      
          public void stop() {
              stopped = true;
          }
      
          public void run() {
              stopped = false;
      
      
              while(! stopped) {
                  try {
                      resultsController.parse(this.tweetBuffer.take());
                      Logger.getLogger("TwitterApp").info(this.name+" has consummed queue, current size of queue:"+tweetBuffer.size());
                  }
                  catch (InterruptedException e) {
                      Logger.getLogger("Consumer").info(this.name+ " interrupted "+e.getMessage());
                  }
              }
          }
      }
      

      要停止线程,您必须执行以下操作:

      consumer.stop();
      consumerThread.interrupt();
      

      【讨论】:

      • 为什么使用(非易失性)停止布尔值而不是简单地检查当前线程是否已被中断?
      • 线程可能因多种原因被中断(取决于应用程序的复杂性)。 stop 布尔值提供了一种简单的方法来确保中断的目的是停止消费者执行。
      • 谢谢,上面的代码放在哪里?消费者.stop(); consumerThread.interrupt();
      • @javadeveloper 在你想停止你的程序时。
      • 那有什么条件呢?
      【解决方案3】:

      您的消费者总是会遇到一个空队列并死掉。当你调试它时,你会自动让它等待足够长的时间让生产者填满队列。

      【讨论】:

      • 感谢我所有的朋友。现在的问题是生产者总是产生所有预期的结果,然后消费者开始消费。由于我改变了课程,我在这里提出了另一个问题:stackoverflow.com/questions/31367478/…
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多