【问题标题】:Producer-consumers(many). Consumers take and put into the shared queue生产者-消费者(许多)。消费者获取并放入共享队列
【发布时间】:2013-04-13 08:16:29
【问题描述】:

我制作了一个生产者-消费者程序。它只是核心 java 中的一个程序,没有任何 GUI(Swing 或 SWT)。它有一个将对象放入队列的生产者。 还有一些消费者必须将一些人员(例如字符串)添加到该共享队列中的每个对象中。因此,每个消费者都必须处理共享队列中的每个对象。 在这种情况下 - 每个 BookShelf 都必须在“books”ArrayList 中包含来自所有消费者的项目。消费者。

问题:我应该在消费者中使用什么条件来正确完成他们的线程? 以下是程序的代码片段。也许我以错误的方式实现它。

这是队列的一个对象:

public class BookShelf {
private int id;
private String name;
private int height;
private int weigh;
List<String> books = Collections.synchronizedList(new ArrayList<String>());

public BookShelf(int id, String name) {
    this.id = id;
    this.name = name;
}
public void addBook(String book) {
  books.add(book);
}
public boolean eq(String book) {
synchronized (books) {
    for (String b: books) {
    if (b.equalsIgnoreCase(book)) {
        return true;
    }
    }
}
return false;
}
 other setters and getters..

}

这里是生产者类:

public class Producer implements Runnable {
private BlockingQueue myQueue;

public Producer(BlockingQueue myQueue) {
this.myQueue = myQueue;
}

public void run() {
for(int i=0; i<7; i++){
    try {
    System.out.println("Produced: " + i);
    BookShelf myBookShelf = new BookShelf(i, "book #" + i);
    myQueue.put(myBookShelf);
    } catch (InterruptedException ex) {
    //Proper handle
    }
}
}

}

这是消费者类之一:

 public class Consumer implements Runnable {
 private BlockingQueue myQueue;

public Consumer(BlockingQueue myQueue) {
    this.myQueue = myQueue; }

public void run() {
    while(true){
        try {
            BookShelf tempBookShelf = (BookShelf) myQueue.take();

            //eq() is my method to check if ArraList has a book.
            if (tempBookShelf.eq("Abc book")) {
                System.out.println("It already has book");
                myQueue.put(tempBookShelf);
                Thread.sleep(2000);
            } else {
                tempBookShelf.addBook("Abc book");
                myQueue.put(tempBookShelf);
                Thread.sleep(2000);
            }
        } catch (InterruptedException ex) {
            //Proper handle
        }
    }
}
}

这里是主类:

public class ProducerConsumerTest {

public static void main(String[] args) {

     BlockingQueue sharedQueue = new LinkedBlockingQueue();
     Thread prodThread = new Thread(new Producer(sharedQueue));
     Thread consThread = new Thread(new Consumer(sharedQueue));
     Thread consThread2 = new Thread(new Consumer2(sharedQueue));

     prodThread.start();
     consThread.start();
     consThread2.start();
}
 }

【问题讨论】:

  • 试试disruptor,它做得很好。
  • 我在 Eclipse 上粘贴它时遇到编译错误。请把真实的代码(比如Consumer2在哪里?)
  • 它只是核心java中的一个程序,没有任何GUI(Swing或SWT)。 Consumer2 只是 Consumer 的一个副本。
  • Disruptor 似乎是一个不错的解决方案。不幸的是,我必须在没有任何第三方框架的情况下用纯 Java 编写这个程序。
  • Consumer2 类吗?或者您是否打算将其仅作为 Consumer 的第二个实例。

标签: java multithreading producer-consumer blockingqueue


【解决方案1】:

向生产者注册每个消费者。每个消费者都有自己的队列,生产者将对象放入所有队列。然后每个消费者在对象的同一实例上进行处理。

    public interface Consumer{
        public void process(BookShelf bs);
    }

    public class Producer implements Runnable{
        private final List<Consumer> consumers = new CopyOnWriteArrayList<Consumer>(); // thread safe but not efficient with lots of changes

        public void register(Consumer c){
            consumers.add(c); // thread safe
        }

        public void run(){
            for(;;){
                BookShelf bs = generateBookShelfByWhateverMeans();
                for (Consumer c : consumers){
                    c.process(bs);
                }
            }
        }
    }

    public class BookShelfConsumer implements Runnable, Consumer{
        private final BlockingQueue<BookShelf> queue = new LinkedTransferQueue<BookShelf>(); // unbounded & thread safe

        public void process(BookShelf bs){
            queue.offer(bs); // non-blocking
        }

        public void run(){
            for(;;){
                BookShelf bs = queue.take(); // blocks until got object or interrupted 
                // catch InterruptedException 
                // do whatever this consumer is supposed to do with the object
            }
        }
    }

【讨论】:

  • 感谢您的回答。但这并不是我所需要的。是的,消费者处理队列中的每个对象。但是每个消费者都有自己的队列。我需要的是每个书架都有来自所有消费者的“books ArrayList”项目(不是你的例子中的一个)
  • 我不得不说我同意@Timmy。如果您需要每个消费者对每个对象进行操作,则生产者将保留主队列,每个消费者将使用相同的对象引用在自己的队列上工作(您可以将该引用用作信号量,以便每个消费者都可以编辑对象安全)
【解决方案2】:

我会尝试改用SwingWorker。它有一个在完成时执行的done() 方法。有关一些代码示例,请参见 this 页面。

如果你使用的不是 Swing,Swt 中有一个类似的功能叫做 Jobs。查看this 页面以获取示例。它还有一个在作业完成时执行的 done() 方法。

【讨论】:

  • 好吧,问题中没有一处说它是 Swing 应用程序。
【解决方案3】:

还有一些(N 个)消费者必须将一些人员(例如字符串)添加到该共享队列中的每个对象中

我假设您的意思是每个消费者都必须将他们的东西添加到每个进入队列的对象中。在这种情况下,这不是生产者-消费者问题,这更像是一个观察者可观察的问题。基本上,当创建一个新的BookShelf 时,即为Observable。所有Observers 都应该被告知BookShelf,并有机会添加他们自己的Book

【讨论】:

  • 感谢您的建议。但是在对该任务的描述中,有信息表明它必须是一种生产者-消费者模式。如果我像观察者模式那样实现它,那将是错误的;(
  • Welp,如果你所谓的消费者将东西推到同一个队列中,那么他们实际上就像生产者一样,对吧?老实说,你最好澄清你的问题。
【解决方案4】:

我建议在 Bookshelf 中使用 ConcurrentLinkedQueue 而不是同步列表 - 它是无锁的(不需要同步)并且可能会更有效。

要结束您的消费者,请将他们的 while(true) 循环更改为 while(!cancel) 循环。给每个消费者一个 cancel 布尔值作为初始化为 false 的实例变量,并给他们一个 cancel() 方法,将 cancel 设置为 true。完成后,请致电 cancel() 联系您的消费者。如果您总是一次取消所有消费者(而不是选择性地取消一些但不取消其他消费者),那么您可以使用静态cancel 而不是实例cancel

【讨论】:

  • 这是个好主意!谁能建议我应该使用什么条件来调用 cancel() ?如果我像if (sharedQueue.size() == 0) { setCancel(); } 那样检查它,它就不能正常工作,因为当一个消费者检查队列的大小时,其他消费者可以取然后将对象放入队列中。但是这个对象不会被处理,因为这个消费者由于 cancel() 方法而结束了。
  • 生产者应该在其运行方法终止时调用cancel()。如果这是您实现它的方式,那么消费者的 while 循环应该是 while(!cancel &amp;&amp; !queue.isEmpty()) 以便您在取消消费者之前完成消费队列。使用!isEmpty() 而不是!size() == 0,因为一些队列实现使用线性时间size() 方法
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-04-22
  • 2012-01-28
  • 1970-01-01
  • 1970-01-01
  • 2015-11-27
相关资源
最近更新 更多