【问题标题】:Calling Thread.Sleep in Subscriber thread causes Publisher thread to sleep在订阅者线程中调用 Thread.Sleep 会导致发布者线程休眠
【发布时间】:2018-07-26 06:09:32
【问题描述】:

我已经在我的应用程序中实现了发布和订阅模式,但是当我在任何一个订阅者中调用 Thread.sleep() 方法或我的任何一个订阅者抛出异常时,所有其他订阅者和发布者都会受到此影响,所以我该如何防止这种情况发生正在发生。

我已经为上述问题创建了一个小演示

发布者代码

import java.util.Random;

public class Publisher extends Thread {

    Broker broker = Broker.getInstance();
    Random random = new Random();

    @Override
    public void run() {
        while (true) {
            System.out.println("Published " + new Timestamp(System.currentTimeMillis()));
            broker.updateSubscribers(Integer.toString(random.nextInt(250)));
        }

    }
}

订阅者界面

public interface Subscriber {

    public void onUpdate(String message);
}

消息订阅者代码

import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");
        broker.subscribe(this);
    }

    @Override
    public void onUpdate(String message) {
        try {
            System.out.println(message);
            sleep(1000);                    // called sleep affects the publisher too
        } catch (InterruptedException ex) {
            Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

如您所见,我在 MessageSubscriber 中调用了 sleep 方法,这也会影响 Publisher 并使其在此期间也处于睡眠状态

编辑添加的经纪人代码

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author hemants
 */
public class Broker {

    List<Subscriber> subscribersList = new ArrayList<>();

    private Broker() {
    }

    public static Broker getInstance() {
        return BrokerHolder.INSTANCE;
    }

    private static class BrokerHolder {

        private static final Broker INSTANCE = new Broker();
    }

    public void subscribe(Subscriber s) {
        subscribersList.add(s);
    }

    public void unsubscribe(Subscriber s) {
        subscribersList.remove(s);
    }

    public void updateSubscribers(String message) {
        subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
    }
}

上面代码运行的主类

public class PubSubPattern {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.start();

        MessageSubscriber messageSubscriber = new MessageSubscriber();
        messageSubscriber.start();
    }
}

嗯,我已经编辑了我的 MessageSubscribe 代码,如下所示,它正在做我所期望的事情

import java.util.logging.Level;
import java.util.logging.Logger;

/**
 *
 * @author hemants
 */
public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");

        while (true) {
            try {
                broker.subscribe(this);
                System.out.println("subscribed ");
                sleep(1000);
                broker.unsubscribe(this);
                System.out.println("un subscribed");
                sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    @Override
    public void onUpdate(String message) {
        System.out.println(message);
    }

}

你对此有什么看法

【问题讨论】:

  • Broker 来自哪里?
  • @LuCio 请检查我已添加代理代码
  • 这很明显。查看答案。
  • @LuCio 请查看编辑
  • 在订阅者线程中调用 Thread.Sleep 会导致发布者线程进入睡眠状态 此语句是错误的 - 在发布者的线程中调用睡眠,因此它会进入睡眠状态。睡眠被放置在订阅者代码中 - 这是真的。

标签: java multithreading design-patterns


【解决方案1】:

所以你执行一些这样的事情

subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));

onUpdatesleep期间

所以很有效

subscribersList.stream().forEach(subscriber -> Thread.sleep());

甚至更详细的

for(Subscriber sub:subscribers){
   Thread.sleep(xxx);
}

难怪它会“影响”其他侦听器,因为调用者在这里被阻止。调用者线程在每个元素上都休眠。

要么使用线程池提交更新任务,要么使用subscribersList.parallelStream()

我希望这仅用于教育目的。

【讨论】:

【解决方案2】:

您正在同一线程中更新订阅者,这就是它会影响其他订阅者的原因。并阻止发布者。

创建新线程来更新代理就可以了。

【讨论】:

  • 是的,我明白了,但我不能这样做,因为在我的应用程序中,发布率很高,比如每分钟 100 条记录,然后据此它将创建 100 个线程来每分钟更新代理
  • 然后就可以使用线程池了,指定你的机器可以处理的最大线程数。
  • 或者你也可以在broker中使用parallelStream()。
【解决方案3】:

这是一个快速的解决方案。我更新了MessageSubscriber 以将接口Subscriber 的使用保持在Broker 内:

public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");
        synchronized (broker) {
            broker.subscribe(this);
        }
        try {
            synchronized (this) {
                wait();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            // TODO OP has to decide how to handle this
            // for example
            synchronized (broker) {
                broker.unsubscribe(this);
            }
        }
    }

    @Override
    public void onUpdate(String message) {
        try {
            synchronized (this) {
                notify();
            }
            System.out.println(message);
            sleep(1000); // called sleep affects the publisher too
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
            Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

我不确定run()InterruptedException 的处理,因为必须获取boker 的锁才能进入同步块。因此,线程可能会等待这个锁而不是有效地中断自己。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-31
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多