为什么使用消息传递
- 缓存的数据需要在许可证服务的所有实例之间保存一致——这意味着不能将数据缓存到服务实例的内存中。
- 在更新或者删除一个组织数据时,许可证服务缓存的数据需要失效——避免读取到过期数据,需要尽早让过时数据失效并删除。
-
使用同步请求--响应模型来实现。组织服务在组织数据变化时调用许可证服务的接口通知组织服务已经变化,或者直接操作许可证服务的缓存。
-
使用事件驱动。组织服务发出一个异步消息。许可证服务收到该消息后清除对应的缓存。
同步请求-响应方式
- 组织服务和许可证服务紧密耦合
- 这种方式不够灵活,如果要为组织服务添加新的消费者,必须修改组织服务代码,以让其通知新的服务数据变动。
使用消息传递方式
- 松耦合性:将服务间的依赖,变成了服务对队列的依赖,依赖关系变弱了。
- 耐久性:即使服务消费者已经关闭了,也可以继续往里发送消息,等消费者开启后处理
- 可伸缩性: 消息发送者不用等待消息消费者的响应,它们可以继续做各自的工作
- 灵活性:消息发送者不用知道谁会消费这个消息,因此在有新的消息消费者时无需修改消息发送代码
spring cloud 中使用消息传递
spring cloud stream 架构
-
发射器
当一个服务准备发送消息时,它将使用发射器发布消息。发射器是一个 Spring 注解接口,它接收一个普通 Java 对象,表示要发布的消息。发射器接收消息,然后序列化(默认序列化为 JSON)后发布到通道中。 -
通道
通道是对队列的一个抽象。通道名称是与目标队列名称相关联的。但是队列名称并不会直接公开在代码中,代码永远只会使用通道名。 -
绑定器
绑定器是 spring cloud stream 框架的一部分,它是与特定消息平台对话的 Spring 代码。通过绑定器,使得开发人员不必依赖于特定平台的库和 API 来发布和消费消息。 -
接收器
服务通过接收器来从队列中接收消息,并将消息反序列化。
实战
建立 redis 服务
docker run -itd --name redis --net host redis:建立 kafka 服务在组织服务中编写消息生产者
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId></dependency><dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>
@EnableBinding(Source.class)public class SimpleSource { private Logger logger = LoggerFactory.getLogger(SimpleSource.class); private Source source; @Autowired public SimpleSource(Source source) { this.source = source; } public void publishOrChange(String action, String orgId) { logger.info("在请求:{}中,发送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId); OrganizationChange change = newOrganizationChange(action, orgId, UserContextHolder.getContext().id); source.output().send(MessageBuilder.withPayload(change).build()); }}
@Autowiredprivate SimpleSource simpleSource;@DeleteMapping(value = "/organization/{orgId}")public void deleteOne(@PathVariable("orgId") String id) { logger.debug("删除了组织:{}", id); simpleSource.publishOrChange("delete", id);}
# 省略了其他配置spring: cloud: stream: bindings: output: destination: orgChangeTopic content-type: application/json kafka: binder: # 替换为部署kafka的ip和端口 zk-nodes:192.168.226.5:2181 brokers: 192.168.226.5:9092
在许可证服务中编写消息消费者
@EnableBinding(Sink.class) //使用Sink接口中定义的通道来监听传入消息public class OrgChange { privateLogger logger = LoggerFactory.getLogger(OrgChange.class); @StreamListener(Sink.INPUT) public void loggerSink(OrganizationChange change){ logger.info("收到一个消息,组织id为:{},关联id为:{}",change.getOrgId(),change.getId()); //删除失效缓存 RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId())); }}//下面两个都在util包下//RedisKeyUtils.java代码如下public class RedisKeyUtils { private static final String ORG_CACHE_PREFIX = "orgCache_"; public static String getOrgCacheKey(String orgId){ return ORG_CACHE_PREFIX+orgId; }}//RedisUtils.java代码如下@Component@SuppressWarnings("all")public class RedisUtils { public staticRedisTemplate redisTemplate; @Autowired public void setRedisTemplate(RedisTemplate redisTemplate) { RedisUtils.redisTemplate = redisTemplate; } public static boolean setObj(String key,Object value){ return setObj(key,value,0); } /** * Description: * * @author fanxb * @date 2019/2/21 15:21 * @param key 键 * @param value 值 * @param time 过期时间,单位ms * @return boolean 是否成功 */ public static boolean setObj(String key,Object value,long time){ try{ if(time<=0){ redisTemplate.opsForValue().set(key,value); }else{ redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS); } return true; }catch (Exception e){ e.printStackTrace(); return false; } } public static Object get(String key){ if(key==null){ return null; } try{ Object obj = redisTemplate.opsForValue().get(key); return obj; }catch (Exception e){ e.printStackTrace(); return null; } } public static void del(String... key){ if(key!=null && key.length>0){ redisTemplate.delete(CollectionUtils.arrayToList(key)); } }}
public Organization getOrganizationWithRibbon(String id) { String key = RedisKeyUtils.getOrgCacheKey(id); //先从redis缓存取数据 Object res = RedisUtils.get(key); if (res == null) { logger.info("当前数据无缓存:{}", id); try{ ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}", HttpMethod.GET, null, Organization.class, id); res = responseEntity.getBody(); RedisUtils.setObj(key, res); }catch (Exception e){ e.printStackTrace(); } } else { logger.info("当前数据为缓存数据:{}", id); } return (Organization) res; }
spring: cloud: stream: bindings: input: destination: orgChangeTopic content-type:application/json # 定义将要消费消息的消费者组的名称 # 可能多个服务监听同一个消息队列。如果定义了消费者组,那么同组中只要有一个消费了消息,剩余的不会再次消费该消息,保证只有消息的 # 一个副本会被该组的某个实例所消费 group: licensingGroup kafka: binder: zk-nodes:192.168.226.5:2181 brokers: 192.168.226.5:9092
自定义通道
自定义发数据通道public interface CustomOutput { @Output("customOutput") MessageChannel out();}
自定义收数据通道public interface CustomInput { @Input("customInput") SubscribableChannel in();}
结束