【问题标题】:Single Producer Single Consumer now I Need Multiple Consumers单一生产者单一消费者现在我需要多个消费者
【发布时间】:2011-12-16 19:36:30
【问题描述】:

我有一种情况,我编写了一个简单的生产者消费者模型,用于从蓝牙读取数据块,然后每 10k 字节将其写入文件。我使用了一个标准的 P-C 模型,使用 Vector 作为我的消息持有者。那么我该如何改变这一点,以便多个线程消费者可以读取相同的消息,我认为这个术语是多播?我实际上是在 Android 手机上使用它,所以 JMS 可能不是一个选项。

static final int MAXQUEUE = 50000; 
private Vector<byte[]> messages = new Vector<byte[]>(); 

/**
 * Put the message in the queue for the Consumer Thread
 */
private synchronized void putMessage(byte[] send) throws InterruptedException { 

    while ( messages.size() == MAXQUEUE ) 
        wait(); 
    messages.addElement( send ); 
    notify(); 
} 


/**
 * This method is called by the consumer to see if any messages in the queue
 */
public synchronized byte[] getMessage()throws InterruptedException { 
    notify(); 
    while ( messages.size() == 0 && !Thread.interrupted()) {
        wait(1); 
    }
    byte[] message = messages.firstElement(); 
    messages.removeElement( message ); 
    return message; 
} 

我引用的是 Oreilly 书籍 Message Parser 部分中的代码

【问题讨论】:

    标签: java android message-queue


    【解决方案1】:

    Pub-sub 机制绝对是实现你想要的方式。我不确定为什么为 Android 开发会限制您使用 JMS,这是一个非常简单的规范。退房 this thread 在 SO。

    【讨论】:

    • 是的,但这主要是在谈论使用 JMS,这对于我在电话应用程序上在线程之间发送消息所需要的东西来说有点过火了。
    • 在这种情况下,将您的 Vector(或 Hanno Binder 建议的队列)简单地包装成 Observer/Observable 模式吗?
    【解决方案2】:

    您绝对应该使用queue 而不是Vector
    给每个线程自己的队列,当收到新消息时,add() 将新消息发送到每个线程的队列。为了灵活性,侦听器模式也可能有用。

    编辑:

    好的,我觉得我也应该添加一个示例:
    (经典观察者模式)

    这是接口,所有消费者都必须实现:

    public interface MessageListener {
      public void newMessage( byte[] message );
    }
    

    生产者可能如下所示:

    public class Producer {
      Collection<MessageListener> listeners = new ArrayList<MessageListener>();
    
    
      // Allow interested parties to register for new messages
      public void addListener( MessageListener listener ) {
        this.listeners.add( listener );
      }
    
      public void removeListener( Object listener ) {
        this.listeners.remove( listener );
      }
    
      protected void produceMessages() {
        byte[] msg = new byte[10];
    
        // Create message and put into msg
    
        // Tell all registered listeners about the new message:
        for ( MessageListener l : this.listeners ) {
          l.newMessage( msg );
        }
    
      }
    }
    

    消费者类可能是(使用阻塞队列为我们完成所有wait()ing 和notify()ing):

    public class Consumer implements MessageListener {
    
      BlockingQueue< byte[] > queue = new LinkedBlockingQueue< byte[] >();
    
      // This implements the MessageListener interface:
      @Override
      public void newMessage( byte[] message ) {
        try {
          queue.put( message );
        } catch (InterruptedException e) {
            // won't happen.
        }
      }
    
        // Execute in another thread:       
        protected void handleMessages() throws InterruptedException {
            while ( true ) {
                byte[] newMessage = queue.take();
    
                // handle the new message.
            }
        }
    
    
    }
    

    【讨论】:

    • Vector 和 Queue 都只能保存对象,不能保存图元。使用阻塞队列,上面的整个代码就变得像 queue.put(message) 和 queue.take() 一样简单。
    • 我更倾向于你的例子,因为我无法让 SettableFuture 工作。
    • 我总是使用 Listeners 处理类似的事情,甚至编写了一个完整的 GUI 状态监听器系统,用于从 JMS 系统获取实时数据并使用这些状态更改更新自定义组件。
    【解决方案3】:

    这是我在挖掘一些代码并修改一些现有示例时提出的示例。

    package test.messaging;
    
    import java.util.ArrayList;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class TestProducerConsumers {
    
        static Broker broker;
    
        public TestProducerConsumers(int maxSize) {
            broker = new Broker(maxSize);
            Producer p = new Producer();
            Consumer c1 = new Consumer("One");
            broker.consumers.add(c1);
            c1.start();
    
            Consumer c2 = new Consumer("Two");
            broker.consumers.add(c2);
            c2.start();
    
            p.start();
        }
    
        // Test Producer, use your own message producer on a thread to call up
        // broker.insert() possibly passing it the message instead.
        class Producer extends Thread {
    
            @Override
            public void run() {
                while (true) {
                    try {
                        broker.insert();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        class Consumer extends Thread {
            String myName;
            LinkedBlockingQueue<String> queue;
    
            Consumer(String m) {
                this.myName = m;
                queue = new LinkedBlockingQueue<String>();
            }
    
            @Override
            public void run() {
                while(!Thread.interrupted()) {
                    try {
                        while (queue.size() == 0 && !Thread.interrupted()) {
                            ;
                        }
                        while (queue.peek() == null && !Thread.interrupted()) {
                            ;
                        }
                        System.out.println("" + myName + " Consumer: " + queue.poll());
                    } catch (Exception e) { }
                }
            }
        }
    
        class Broker {
            public ArrayList<Consumer> consumers = new ArrayList<Consumer>();
    
            int n;
            int maxSize;
    
            public Broker(int maxSize) {
                n = 0;
                this.maxSize = maxSize;
            }
    
            synchronized void insert() throws InterruptedException {
                        // only here for testing don't want it to runaway and 
                        //memory leak, only testing first 100 samples.
                if (n == maxSize)
                    wait();
                System.out.println("Producer: " + n++);
                for (Consumer c : consumers) {
                    c.queue.add("Message " + n);
                }
            }
    
        }
    
        public static void main(String[] args) {
            TestProducerConsumers pc = new TestProducerConsumers(100);
    
        }
    }
    

    【讨论】:

    • 不要在消费者中使用那些 while(...) 循环来忙等待!这真是糟糕的编码。 阻塞队列会为你处理所有这些,包括insert()中的wait()。 (相应的 notify() 调用在哪里?)
    • 你不需要通知插入,因为消费者没有使用 wait() 'while's 在那里进行测试。我在消费者中投入了一些睡眠而不是等待,因为我的消息不需要是实时的。
    • 那么,“if (n == maxSize) wait();”是怎么回事?
    • 再次测试只想要 100 条消息
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-02-24
    • 2015-04-05
    • 2018-03-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-11-08
    相关资源
    最近更新 更多