Redis订阅发布
经过两天的尝试,终于成功地在云计算平台上完成了redis的订阅发布。从kafka接收数据到传送给hbase并插入到数据库中。
那么今天就来随笔记录一下redis的订阅发布模式的用法。
命令行模式的订阅发布
打开虚拟机,运行redis-cli,输入subscribe redischat,即订阅redischat信道。
另开一个终端,同样运行redis-cli 输入publish redischat helloworld,向信道中写入数据helloworld,如图所示:
可以看到subscirbe这边已经已经接收到消息helloworld
这就是命令行的订阅发布。是不是很简单?接着我们就尝试用java代码来运行redis的订阅发布。
在windows上的eclipse完成redis的订阅发布
原本想直接就在虚拟机上运行eclipse进行redis的订阅发布运行代码。结果虚拟机一运行eclipse直接死机(我的机子没有办法添加内存条),多次尝试以后,直接放弃,选择用windows运行代码远程调用redis。
废话少说,直接的上代码:
首先是继承了JedisPubSub的Subscriber的类:
import redis.clients.jedis.JedisPubSub;
public class MySubscriber extends JedisPubSub{
private String get_message="";
public String toString() {
return get_message;
}
public Subscriber() {
}
public void onMessage(String channel, String message) {
System.out.println(String.format("成功接收信道%s的消息,消息内容:%s", channel,message));
}
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("成功接收信道 %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
接着是两个多线程的订阅类和发布类,因为订阅一旦执行就是阻塞式的,必须要不断的进行监听,所有使用多线程的方式来让代码正常执行。
首先是订阅类的函数:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class MySubThread extends Thread{
private Thread t;
private Jedis myJedis=null;
private String myChannel="";
private JedisPubSub myJedisPubSub=null;
public MySubThread(Jedis jedis,String channel,JedisPubSub jedispubsub) {
myJedis=jedis;
myChannel=channel;
myJedisPubSub=jedispubsub;
}
public void run() {
myJedis.subscribe(myJedisPubSub, myChannel);
}
public void start () {
System.out.println("开始监听信道!");
if (t == null) {
t = new Thread (this);
t.start ();
}
}
}
接着是发布类的函数:
import redis.clients.jedis.Jedis;
public class MyPubThread extends Thread{
private Thread t;
private Jedis myJedis=null;
private String myChannel="";
private String message="";
public MyPubThread(Jedis jedis,String channel,String s) {
myJedis=jedis;
myChannel=channel;
message=s;
}
public void run() {
myJedis.publish(myChannel, message);
System.out.println("成功发送消息:"+message);
}
public void start () {
if (t == null) {
t = new Thread (this);
t.start ();
}
}
}
执行结果
代码进行完毕以后,再写一个测试函数,用于测试:
import redis.clients.jedis.Jedis;
public class testredis {
public static void main(String[] args) {
String redisIp="192.168.56.101";
int redisPort=6379;
String channel="redischat";
String message="Hello world!";
MySubThread testSubTread=new MySubThread(new Jedis(redisIp,redisPort),channel,new MySubscriber());
testSubTread.start();
for(int i=0;i<50;i++) {
MyPubThread testPubThread=new MyPubThread(new Jedis(redisIp,redisPort),channel,message+i);
testPubThread.start();
}
}
}
最后运行代码,结果如下图所示:
以上就是redis订阅发布的全部内容,希望对大家有所帮助。
附注:
改代码的运行需要用到redis的jar包,大家可以从下面的链接下载,方便大家使用。
redis的jar包
提取码:dlau
导入方法大家可以上网查找,并不麻烦,谢谢。