Redis订阅发布

经过两天的尝试,终于成功地在云计算平台上完成了redis的订阅发布。从kafka接收数据到传送给hbase并插入到数据库中。
那么今天就来随笔记录一下redis的订阅发布模式的用法。

命令行模式的订阅发布

打开虚拟机,运行redis-cli,输入subscribe redischat,即订阅redischat信道。
另开一个终端,同样运行redis-cli 输入publish redischat helloworld,向信道中写入数据helloworld,如图所示:
Redis订阅发布
可以看到subscirbe这边已经已经接收到消息helloworld
这就是命令行的订阅发布。是不是很简单?接着我们就尝试用java代码来运行redis的订阅发布。

在windows上的eclipse完成redis的订阅发布

原本想直接就在虚拟机上运行eclipse进行redis的订阅发布运行代码。结果虚拟机一运行eclipse直接死机(我的机子没有办法添加内存条),多次尝试以后,直接放弃,选择用windows运行代码远程调用redis。
废话少说,直接的上代码:
首先是继承了JedisPubSub的Subscriber的类:

import redis.clients.jedis.JedisPubSub;

public class MySubscriber extends JedisPubSub{
	private String get_message="";
	public  String toString() {
		return get_message;
	}
	
	public Subscriber() {
    }
	
    public void onMessage(String channel, String message) {
        System.out.println(String.format("成功接收信道%s的消息,消息内容:%s", channel,message));
    }
    
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("成功接收信道 %s, subscribedChannels %d", 
                channel, subscribedChannels));
    }
}

接着是两个多线程的订阅类和发布类,因为订阅一旦执行就是阻塞式的,必须要不断的进行监听,所有使用多线程的方式来让代码正常执行。
首先是订阅类的函数:

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class MySubThread extends Thread{
	private Thread t;
	private Jedis myJedis=null;
	private String myChannel="";
	private JedisPubSub myJedisPubSub=null;
	   
	public MySubThread(Jedis jedis,String channel,JedisPubSub jedispubsub) {
		myJedis=jedis;
		myChannel=channel;
		myJedisPubSub=jedispubsub;
	}
	
	public void run() {
		myJedis.subscribe(myJedisPubSub, myChannel);
	}
	
	public void start () {
		System.out.println("开始监听信道!");
		if (t == null) {
			t = new Thread (this);
			t.start ();
			}
	}

}

接着是发布类的函数:

import redis.clients.jedis.Jedis;

public class MyPubThread extends Thread{
	private Thread t;
	private Jedis myJedis=null;
	private String myChannel="";
	private String message="";
	
	public MyPubThread(Jedis jedis,String channel,String s) {
		myJedis=jedis;
		myChannel=channel;
		message=s;
	}
	
	public void run() {
		myJedis.publish(myChannel, message);
		System.out.println("成功发送消息:"+message);
	}
	
	public void start () {
		if (t == null) {
			t = new Thread (this);
			t.start ();
			}
		}
}

执行结果

代码进行完毕以后,再写一个测试函数,用于测试:

import redis.clients.jedis.Jedis;

public class testredis {
	public static void main(String[] args) {
		String redisIp="192.168.56.101";
		int redisPort=6379;
		String channel="redischat";
		String message="Hello world!";
		MySubThread testSubTread=new MySubThread(new Jedis(redisIp,redisPort),channel,new MySubscriber());
		testSubTread.start();
		for(int i=0;i<50;i++) {
			MyPubThread testPubThread=new MyPubThread(new Jedis(redisIp,redisPort),channel,message+i);
			testPubThread.start();
		}
	}
}

最后运行代码,结果如下图所示:
Redis订阅发布
以上就是redis订阅发布的全部内容,希望对大家有所帮助。

附注:

改代码的运行需要用到redis的jar包,大家可以从下面的链接下载,方便大家使用。
redis的jar包
提取码:dlau
导入方法大家可以上网查找,并不麻烦,谢谢。

相关文章: