【发布时间】:2015-07-12 19:20:10
【问题描述】:
在一个 JPA 项目中,我有两个线程:生产者和消费者。
生产者应该从另一个类中获取twitter查询的搜索结果并将其放入LinkedBlockingQueue,线程消费者应该消费结果并使用另一个类将它们持久化到MYSQL。
这里我先给大家介绍一下主类,生产者和消费者:
主类:
public class RunQuery {
public final static EntityManagerFactory
emf=Persistence.createEntityManagerFactory("mypu");
public static void main(String[] args) throws Exception
{
org.apache.log4j.BasicConfigurator.configure(new
NullAppender());
BlockingQueue<Pair<String, QueryResult>> tweetBuffer=new
LinkedBlockingQueue<Pair<String,QueryResult>>();
// Creating Producer and Consumer Thread
Runnable producerThread = new
TwitterStreamProducer("producer",tweetBuffer,args);
Runnable consumerThread = new
TwitterStreamConsumer("consumer",tweetBuffer);
Thread producer=new Thread(producerThread);
Thread consumer=new Thread(consumerThread);
producer.start();
Thread.sleep(100);
consumer.start();
}
制片人话题:
public class TwitterStreamProducer implements Runnable{
private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
private final ResultsController resultsController;
private String[] Keywords;
private String name;
public TwitterStreamProducer(String name,
BlockingQueue<Pair<String, QueryResult>> tweetBuffer,
String[]
keywords) {
this.tweetBuffer=tweetBuffer;
this.resultsController=new ResultsController();
this.Keywords=keywords;
this.name=name;
}
public void run() {
for(String key:Keywords)
{
boolean interrupted = false;
try {
this.tweetBuffer.put( new Pair<String,QueryResult>
(key,resultsController.search(key)) );
Logger.getLogger("TwitterApp").info("one result added to the
queue, current size:"+tweetBuffer.size());
}
catch (InterruptedException e) {
interrupted = true;
// Logger.getLogger("Producer").info( this.name+ " Interrupted
"+e.toString() );
}
catch(Exception e)
{
interrupted = true;
}
finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
}
消费者话题:
public class TwitterStreamConsumer implements Runnable{
private final BlockingQueue<Pair<String, QueryResult>> tweetBuffer;
private final ResultsController resultsController;
private String name=null;
public TwitterStreamConsumer(String name,
BlockingQueue<Pair<String, QueryResult>> tweetBuffer) {
this.name=name;
this.tweetBuffer=tweetBuffer;
this.resultsController=new ResultsController();
}
public void run() {
while(! tweetBuffer.isEmpty())
{
try {
Logger.getLogger("TwitterApp").info(this.name+" has consummed
queue, current size of queue:"+tweetBuffer.size());
resultsController.parse(this.tweetBuffer.take());
} catch (InterruptedException e) {
Logger.getLogger("Consumer").info(this.name+ " interrupted
"+e.getMessage());
}
}
}
}
如果需要其他信息,我会在这里提供。
这是Logger结果,当我运行它时,生产者总是产生8个结果,之后什么都没有发生,应用程序不会被中断或不会产生任何错误:
Jul 12, 2015 12:07:22 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:1
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:2
Jul 12, 2015 12:07:23 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:3
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:4
Jul 12, 2015 12:07:24 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:5
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:6
Jul 12, 2015 12:07:25 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:7
Jul 12, 2015 12:07:26 PM main.TwitterStreamProducer run
INFORMATION: one result added to the queue, current size:8
当我在消费者线程中使用 run 方法开头的断点进行调试时,它可以正常工作。
提前感谢您的建议。
【问题讨论】:
标签: java mysql multithreading producer-consumer