redis 除了作为缓存的功能外还可以用作消息中间件的功能,这片博客主要是介绍一下 redis 整合spring 实现消息的发布和订阅功能;
1:redis依赖,依赖两个包,redis 包, spring-redis 包用于整合redis,这里就不介绍了,具体可以参考上一篇博客 :redis 缓存 中的介绍;
2:redis和spring的整合:
<bean >
<property name="maxIdle" value="1" />
<property name="maxTotal" value="5" />
<property name="blockWhenExhausted" value="true" />
<property name="maxWaitMillis" value="30000" />
<property name="testOnBorrow" value="true" />
</bean>
<bean >
<property name="hostName" value="localhost" />
<property name="port" value="6379"/>
<property name="poolConfig" ref="jedisPoolConfig" />
<property name="usePool" value="true"/>
</bean>
<bean >
<property name="connectionFactory" ref="jedisConnectionFactory" />
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="valueSerializer">
<bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashValueSerializer">
<bean class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/>
</property>
</bean>
<!-- redis 消息发布订阅 -->
<bean />
<!-- 这里配置线程池任务 -->
<bean >
<property name="corePoolSize" value="4"/>
<property name="maxPoolSize" value="4"/>
<property name="queueCapacity" value="100000"/>
</bean>
<!-- 配置redis container 将监听类注入到redis容器中,实现监听容器中指定 主题 队列的功能 -->
<bean >
destroy-method="destroy">
<property name="connectionFactory" ref="jedisConnectionFactory"/>
<property name="taskExecutor">
<ref bean="taskExecutor" />
</property>
<property name="messageListeners">
<map>
<entry key-ref="myRedisListener">
<bean class="org.springframework.data.redis.listener.ChannelTopic">
<constructor-arg value="push:myredis"/>
</bean>
</entry>
</map>
</property>
</bean>
3:监听类 该类只要实现 MessageListener 接口即可,并且在重写的方法中进行需要的业务处理:
public class MyRedisListener implements MessageListener{ @Override public void onMessage(Message message, byte[] pattern) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); // redisTemplate.convertAndSend("push:myredis","this is the redis subscribe!!"); String channelStr = new String(channel); String bodyStr = new String(body); System.out.println("渠道为:"+channelStr+"消息为:"+bodyStr); //这里的业务处理只做简单的打印输出 } }
4:消息发布类:
在消息发布类中需要使用到 redisTemplate 来进行消息的发布,其中,消息发布的方法为 redisTemplate.convertAndSend(String channel,String mess);
需要指定消息发布的 通道名称,这里的通道和监听中配置的渠道名称一致 ;mess 就是你需要发布到该通道上的内容;
5:消息的发布:
@Test public void redisSubTest() throws InterruptedException { ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-redis.xml"); RedisTemplate redisTemplate = (RedisTemplate) ctx.getBean("redisTemplate"); redisTemplate.convertAndSend("push:myredis","this is the redis subscribe!!"); Thread.sleep(5000); }
到这里为止,一个简单的基于redis的订阅发布就实现了,在项目中你可以根据你的具体业务来实现功能