【问题标题】:Sending processed messages to specific threads将处理后的消息发送到特定线程
【发布时间】:2011-08-04 04:17:33
【问题描述】:

我有一组线程,其中每个线程都必须等待其所需的输入,进行一些计算,最后将其输出值发送到特定线程。

我计划有一个包含线程名称和线程本身的全局映射,以便让每个线程按名称获取其“后继”线程,然后将值发送给它们。

首先,我查看了使用阻塞队列的 Producer-Consumer 示例:

class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while(true) { 
                System.out.println("Waiting for input");
                consume(queue.take()); 
            }
        } catch (InterruptedException ex) { 
            ex.printStackTrace();
        }
    }

    void consume(Object x) { 
        System.out.println("Received: " + x);
    }
}

class Setup {
    public static void main(String...args) {
        BlockingQueue q = new ArrayBlockingQueue<String>(10);
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

我认为我可以为每个线程设置一个阻塞队列。然后,消费者线程将在 queue.take() 上循环,直到它接收到所有所需的值。

后来,我发现了这个post,其中提出了与我类似的问题。提出的解决方案似乎比阻塞队列解决方案更容易:它基于只在我想要的线程上调用一个方法,然后将消息发送到。

我想就这两种方法中的哪一种最好,或者是否有更好的方法来实现我想要的,我想请您提供一些建议(因为我认为这是一种常见情况)。

非常感谢您的帮助。

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    消费者-生产者很好。 (您在参考 SO 问题中提到的那个“答案”是一罐蠕虫......仔细考虑......)

    您可以使用QueuePipe,甚至可以使用PipedInputStreamPipedOutputStream。还有Exchanger

    这里是来自 Exchanger javadoc 的示例模型。不用担心嵌套类,它只是一种紧凑的风格——与主题完全无关。

    这里我们有一个“管道”类。它有 2 个线程(名称中的 R/L 指的是左、右)。管道流量为 R->L。

    /* 
     * mostly based on 
     * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html 
     */
    package so_6936111;
    
    import java.util.concurrent.Exchanger;
    
    public class WorkflowDemo {
    
        public static void main(String[] args) {
            Pipeline pipeline = new Pipeline();
            pipeline.start();
        }
        // ----------------------------------------------------------------
        // Pipeline
        // ----------------------------------------------------------------
    
        public static class Pipeline {
    
            /** exchanger for messages */
            Exchanger<Message> exchanger = new Exchanger<Message>();
    
            /* the two message instances that are passed back and forth */
            Message msg_1 = new Message();
            Message msg_2 = new Message();
    
            /** startups the pipeline */
            void start() {
                new Thread(new WorkerR()).start();
                new Thread(new WorkerL()).start();
            }
    
    
            /** Message objects are passed between workflow threads */
            public static class Message {
                private Object content;
                public Object getContent() { return content; }
                public void setContent(Object c) { this.content = c; }
            }
    
    
            /** WorkerR is at the head of the pipeline */
            class WorkerR implements Runnable {
                public void run() {
                    Message message = msg_1;
                    try {
                        while (true) {
                            Object data = doSomeWork();
                            message.setContent(data);
                            message = exchanger.exchange(message);
                        }
                    } catch (InterruptedException ex) { ex.printStackTrace();}
                }
                /** 
                 * let's pretend this is where you get your 
                 * initial data and do some work
                 */
                 private Object doSomeWork() {
                    return String.format("STEP-1@t:%d", System.nanoTime());
                 }
            }
    
            /** WorkerL is at the tail of the pipeline */
            class WorkerL implements Runnable {
                public void run() {
                    Message message = msg_2;
                    try {
                        while (true) {
                            message = exchanger.exchange(message);
                            Object data = doPostProcessing(message.getContent());
                            System.out.format("%s\n", data);
                        }
                    } catch (InterruptedException ex) { ex.printStackTrace();}
                }
    
                /**
                 * Let's pretend this is where the 2nd step of the workflow.
                 */
                private Object doPostProcessing(Object data) {
                    return String.format("%s | STEP-2@t:%d", data, System.nanoTime());
                }
            }
        }
    }
    

    输出:

    STEP-1@t:1312434325594730000 | STEP-2@t:1312434325594747000
    STEP-1@t:1312434325594750000 | STEP-2@t:1312434325594765000
    STEP-1@t:1312434325594768000 | STEP-2@t:1312434325594784000
    STEP-1@t:1312434325594787000 | STEP-2@t:1312434325594804000
    STEP-1@t:1312434325594806000 | STEP-2@t:1312434325594823000
    STEP-1@t:1312434325594826000 | STEP-2@t:1312434325594841000
    ...
    

    【讨论】:

    • 感谢您的回答。交换器看起来像我需要的,你是对的,我指出的另一个解决方案看起来不太好。我有最后一个问题:如果我有多个线程,假设生产者想向一个特定线程发送消息,我该怎么做?我应该每个线程有一个 Exchanger 实例吗?如果这是一个愚蠢的问题,我很抱歉,但我不想重新发明轮子。谢谢!
    • Rafael:只需用您想要的确切细节更新您的问题。 (消费者/生产者的代码只是噪音,仅供参考)。 Exchanger 是 1:1、阻塞、无缓冲的 hand-off,只是为了让您了解为什么需要在 Q 中准确指定您想要什么,数据应该如何流动,等等。上面只是一个toy,向你展示了一个1:1 管道。 1:N 还是 N:1?使用 java.util.concurrent Queues。
    猜你喜欢
    • 1970-01-01
    • 2012-06-21
    • 1970-01-01
    • 1970-01-01
    • 2013-06-18
    • 2015-06-12
    • 1970-01-01
    • 2019-03-06
    • 2013-12-02
    相关资源
    最近更新 更多