【发布时间】:2018-07-26 06:09:32
【问题描述】:
我已经在我的应用程序中实现了发布和订阅模式,但是当我在任何一个订阅者中调用 Thread.sleep() 方法或我的任何一个订阅者抛出异常时,所有其他订阅者和发布者都会受到此影响,所以我该如何防止这种情况发生正在发生。
我已经为上述问题创建了一个小演示
发布者代码
import java.util.Random;
public class Publisher extends Thread {
Broker broker = Broker.getInstance();
Random random = new Random();
@Override
public void run() {
while (true) {
System.out.println("Published " + new Timestamp(System.currentTimeMillis()));
broker.updateSubscribers(Integer.toString(random.nextInt(250)));
}
}
}
订阅者界面
public interface Subscriber {
public void onUpdate(String message);
}
消息订阅者代码
import java.util.logging.Level;
import java.util.logging.Logger;
public class MessageSubscriber extends Thread implements Subscriber {
Broker broker = Broker.getInstance();
@Override
public void run() {
System.out.println("MessageSubscriber started...");
broker.subscribe(this);
}
@Override
public void onUpdate(String message) {
try {
System.out.println(message);
sleep(1000); // called sleep affects the publisher too
} catch (InterruptedException ex) {
Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
如您所见,我在 MessageSubscriber 中调用了 sleep 方法,这也会影响 Publisher 并使其在此期间也处于睡眠状态
编辑添加的经纪人代码
import java.util.ArrayList;
import java.util.List;
/**
*
* @author hemants
*/
public class Broker {
List<Subscriber> subscribersList = new ArrayList<>();
private Broker() {
}
public static Broker getInstance() {
return BrokerHolder.INSTANCE;
}
private static class BrokerHolder {
private static final Broker INSTANCE = new Broker();
}
public void subscribe(Subscriber s) {
subscribersList.add(s);
}
public void unsubscribe(Subscriber s) {
subscribersList.remove(s);
}
public void updateSubscribers(String message) {
subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
}
}
上面代码运行的主类
public class PubSubPattern {
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.start();
MessageSubscriber messageSubscriber = new MessageSubscriber();
messageSubscriber.start();
}
}
嗯,我已经编辑了我的 MessageSubscribe 代码,如下所示,它正在做我所期望的事情
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
* @author hemants
*/
public class MessageSubscriber extends Thread implements Subscriber {
Broker broker = Broker.getInstance();
@Override
public void run() {
System.out.println("MessageSubscriber started...");
while (true) {
try {
broker.subscribe(this);
System.out.println("subscribed ");
sleep(1000);
broker.unsubscribe(this);
System.out.println("un subscribed");
sleep(1000);
} catch (InterruptedException ex) {
Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
@Override
public void onUpdate(String message) {
System.out.println(message);
}
}
你对此有什么看法
【问题讨论】:
-
Broker来自哪里? -
@LuCio 请检查我已添加代理代码
-
这很明显。查看答案。
-
@LuCio 请查看编辑
-
在订阅者线程中调用 Thread.Sleep 会导致发布者线程进入睡眠状态 此语句是错误的 - 在发布者的线程中调用睡眠,因此它会进入睡眠状态。睡眠被放置在订阅者代码中 - 这是真的。
标签: java multithreading design-patterns