RPC框架
RPC是Remote Procedure Call的缩写,像Client-Servier一样的远程过程调用,也就是调用远程服务就跟调用本地服务一样方便,一般用于将程序部署在不同的机器上,供客户端进行调用。就像一个request-response调用系统一样简单。
RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
RPC实现
本次实例,通信使用Netty,注册中心使用zookeeper。由服务端向zookeeper注册要暴露的接口(包括接口所在的服务器的ip和端口,相当于key为接口名,value为ip+端口号),启动NettyServer服务,等待NettyClient的连接;客户端使用动态代理的方式,在接口调用时,获取到方法的调用详情,根据调用的接口名,通过zookeeper获取到接口所在的服务器的ip+端口号,NeetyClient连接到NettyServer,然后将方法的调用详情通过Netty通信发送到NettyServer上,由Server使用Java反射调用响应接口的实现类获取到返回值,并将发返回值发送回NettyClient,再通过动态代理将返回值返回给方法的调用者。
1. 添加依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.11</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.10.Final</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
2. 暴露的接口和接口实现类
/*
* @author uv
* @date 2018/10/12 8:56
* 为了方便,这里就不把接口打成jar包调用了
*/
public interface UserService {
String sayHello(String name);
}
/*
* @author uv
* @date 2018/10/12 8:56
* 接口实现类
*/
import com.uv.rpc.annotation.RpcService;
import com.uv.api.UserService;
@RpcService(UserService.class)
public class UserServceImpl implements UserService{
@Override
public String sayHello(String name) {
return "Hello " + name;
}
}
3. zookeeper连接类,用于服务启动时连接到zookeeper,并在zookeeper添加根节点/rpc/,并在跟节点下添加暴露接口的子节点
/*
* @author uv
* @date 2018/10/11 15:56
* zookeeper 工具类(服务发现)
* zookeeper存储结构为目录层级结构
*/
import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class ZookeeperUtil {
//zookeeper连接超时时间
private static final int SESSION_TIMEOUT = 5000;
//在zookeeper注册的根节点
private static final String ROOT = "/rpc";
private static ZooKeeper zooKeeper = null;
/**
* @param ip zookeeper服务的IP地址
* @param port 端口号 连接zookeeper服务
* 连接zookeeper
*/
public static void connect(String ip, int port) {
try {
zooKeeper = new ZooKeeper(ip + ":" + port, SESSION_TIMEOUT, null);
} catch (IOException e) {
System.out.println("zookeeper连接失败");
e.printStackTrace();
}
}
/**
* @param ip provider服务的ip
* 在zookeeper中注册暴漏的接口
*/
public static void register(String ip, int port) {
if(zooKeeper == null) {
throw new RuntimeException("zookeeper未连接");
}
try {
List<String> interfaceList = ScanUtil.interfaceList;
if (interfaceList.size() == 0) {
System.out.println("没有可暴露的接口");
return;
}
/**
* CreateMode:
* PERSISTENT (持续的,相对于EPHEMERAL,不会随着client的断开而消失)
* PERSISTENT_SEQUENTIAL(持久的且带顺序的)
* EPHEMERAL (临时的,生命周期依赖于client session)
* EPHEMERAL_SEQUENTIAL (临时的,带顺序的)
*/
//如果根节点不存在,创建永久(PERSISTENT)根节点,临时节点下不能创建子节点
if (zooKeeper.exists(ROOT, true) == null) {
zooKeeper.create(ROOT, "rpc".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
for (String intf : interfaceList) {
String path = ROOT + "/" + intf;
Stat exists = zooKeeper.exists(path, true);
//节点不存在,创建临时(EPHEMERAL)节点
if (exists == null) {
zooKeeper.create(ROOT + "/" + intf, (ip + ":" + port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} else { //修改节点,如果 version 为 -1 则可以匹配任何版本
zooKeeper.setData(ROOT+ "/" + intf, (ip + ":" + port).getBytes(), -1);
}
System.out.println("zookeeper创建节点:" + intf + "成功");
}
} catch (Exception e) {
System.out.println("zookeeper创建节点失败");
e.printStackTrace();
}
}
}
3. 采用注解的方式暴露接口,用在暴露接口实现类的注解@RpcService,value指向暴露的接口
/*
* @author uv
* @date 2018/10/11 18:31
* 暴露的接口
*/
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcService {
//实现类实现的暴露的接口
Class<?> value();
}
4. 扫描带有@RpcService注解的类(即需要暴露的接口和接口的实现类,并形成对应关系并保存,在连接到zookeeper时,将interfaceList中的接口添加到zookeeper节点上;interfaceMap在NettyClient发送过接口的请求时,根据接口名,获取到杜对应的实现类)
/*
* @author uv
* @date 2018/10/11 18:42
* 扫描注解
*/
import com.uv.rpc.annotation.RpcService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.reflections.Reflections;
public class ScanUtil {
//暴露服务的接口
public static List<String> interfaceList = new ArrayList<>();
//暴露接口和实现类的映射关系(key=接口,value=接口实现类)
public static Map<String, Class<?>> interfaceClassMap = new ConcurrentHashMap<>();
//扫描类
public static void scanPath(String path) {
Reflections reflections = new Reflections(path);
//扫描带有@RpcService注解的类
Set<Class<?>> annotated = reflections.getTypesAnnotatedWith(RpcService.class);
for (Class<?> clazz : annotated) {
RpcService rpcService = clazz.getAnnotation(RpcService.class);
//暴露服务的接口
interfaceList.add(rpcService.value().getName());
//暴露接口和实现类的映射关系
interfaceClassMap.put(rpcService.value().getName(), clazz);
}
}
}
5. NettyServer的实现(关于Netty的用法,可以点这里:https://blog.csdn.net/qq_22200097/article/details/83042424)
/*
* @author uv
* @date 2018/10/12 18:25
* 服务端
*/
import com.uv.rpc.netty.protocol.RpcDecoder;
import com.uv.rpc.netty.protocol.RpcEncoder;
import com.uv.rpc.netty.protocol.RpcRequest;
import com.uv.rpc.netty.protocol.RpcResponse;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public void bind(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的
EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件
ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel sh) throws Exception {
sh.pipeline()
.addLast(new RpcDecoder(RpcRequest.class)) //解码request
.addLast(new RpcEncoder(RpcResponse.class)) //编码response
.addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
try {
ChannelFuture future = sb.bind(port).sync();
if (future.isSuccess()) {
System.out.println("服务端启动成功");
} else {
System.out.println("服务端启动失败");
future.cause().printStackTrace();
bossGroup.shutdownGracefully(); //关闭线程组
workerGroup.shutdownGracefully();
}
//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
future.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println("服务端启动失败");
e.printStackTrace();
System.exit(0);
}
}
}
6. ServerHandler,服务端消息处理类,用于处理客户端发送来的请求,通过反射调用接口实现类,返回客户端返回值
/*
* @author uv
* @date 2018/10/12 18:33
* 处理服务端接收的数据
*/
import com.uv.rpc.discovery.ScanUtil;
import com.uv.rpc.netty.protocol.RpcRequest;
import com.uv.rpc.netty.protocol.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
public class ServerHandler extends ChannelInboundHandlerAdapter{
//接受client发送的消息并处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcRequest request = (RpcRequest) msg;
//返回的数据结构
RpcResponse response = new RpcResponse();
response.setId(request.getId()); //返回ID与request对应
try {
//执行相应的方法
Object result = invokeMethod(request);
response.setData(result);
response.setStatus(1);
} catch (Exception e) {
e.printStackTrace();
response.setData(null);
response.setStatus(-1);
}
//返回执行结果
ctx.writeAndFlush(response);
}
//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//读操作时捕获到异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
//客户端去和服务端连接成功时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush("hello client");
}
//根据请求,通过Java反射的方式执行相应的方法
private Object invokeMethod(RpcRequest request) throws Exception{
String className = request.getClassName();
//根据暴露的接口,找到实现类
Class<?> clazz = ScanUtil.interfaceClassMap.get(className);
//找到要执行的方法
Method method = clazz.getDeclaredMethod(request.getMethodName(), request.getParameterTypes());
//执行
Object result = method.invoke(clazz.newInstance(), request.getParameters());
return result;
}
}
7. Netty传输时的实体类,包括要返回的数据,执行状态。(request包括远程调用接口的方法的详细信息,方便服务端进行Java反射)
/*
* @author uv
* @date 2018/10/13 18:10
* 传输请求对象
*/
import java.io.Serializable;
public class RpcRequest implements Serializable{
private static final long serialVersionUID = -4558182507809817096L;
private String id; //消息ID
private String className; //远程调用的类
private String methodName; //远程调用的方法
private Class<?>[] parameterTypes; //方法参数类型列表
private Object[] parameters; //方法参数
public String getId() {
return id;
}
public RpcRequest setId(String id) {
this.id = id;
return this;
}
public String getClassName() {
return className;
}
public RpcRequest setClassName(String className) {
this.className = className;
return this;
}
public String getMethodName() {
return methodName;
}
public RpcRequest setMethodName(String methodName) {
this.methodName = methodName;
return this;
}
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
public RpcRequest setParameterTypes(Class<?>[] parameterTypes) {
this.parameterTypes = parameterTypes;
return this;
}
public Object[] getParameters() {
return parameters;
}
public RpcRequest setParameters(Object[] parameters) {
this.parameters = parameters;
return this;
}
}
/*
* @author uv
* @date 2018/10/13 18:10
* 传输响应对象
*/
import java.io.Serializable;
public class RpcResponse implements Serializable{
private static final long serialVersionUID = 1147496058100401100L;
private String id; //消息体ID
private Object data; //方法执行返回值
// 0=success -1=fail
private int status; //执行状态
public String getId() {
return id;
}
public RpcResponse setId(String id) {
this.id = id;
return this;
}
public Object getData() {
return data;
}
public RpcResponse setData(Object data) {
this.data = data;
return this;
}
public int getStatus() {
return status;
}
public RpcResponse setStatus(int status) {
this.status = status;
return this;
}
}
8. 实体类在传输时,需要进行编码和解码(序列化)才能进行通信,此处定义通讯时的编码器可解码器,此处使用fastJson进行序列化和反序列化,可采用其它方式的进行
/*
* @author uv
* @date 2018/10/13 18:09
* 编码器(将实体类转换成可传输的数据)
*/
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class RpcEncoder extends MessageToByteEncoder {
//目标对象类型进行编码
private Class<?> target;
public RpcEncoder(Class target) {
this.target = target;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (target.isInstance(msg)) {
byte[] data = JSON.toJSONBytes(msg); //使用fastJson将对象转换为byte
out.writeInt(data.length); //先将消息长度写入,也就是消息头
out.writeBytes(data); //消息体中包含我们要发送的数据
}
}
}
/*
* @author uv
* @date 2018/10/13 18:09
* 解码器(将接收的数据转换成实体类)
*/
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class RpcDecoder extends ByteToMessageDecoder {
//目标对象类型进行解码
private Class<?> target;
public RpcDecoder(Class target) {
this.target = target;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) { //不够长度丢弃
return;
}
in.markReaderIndex(); //标记一下当前的readIndex的位置
int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4
if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
Object obj = JSON.parseObject(data, target); //将byte数据转化为我们需要的对象
out.add(obj);
}
}
9. 使用windows版本的zookeeper,进入到bin目录下,执行zkServer.cmd启动
10. Server端启动
import com.uv.rpc.discovery.ScanUtil;
import com.uv.rpc.discovery.ZookeeperUtil;
import com.uv.rpc.netty.server.NettyServer;
/**
* <uv> [2018/10/14 16:55]
*/
public class Main {
//server启动监听的端口
public static final int serverPort = 8018;
public static void main(String[] args) {
//扫描暴露的接口
ScanUtil.scanPath("com.uv.api");
//连接zookeeper
ZookeeperUtil.connect("127.0.0.1", 2181);
//向zookeeper注册暴露的接口
ZookeeperUtil.register("127.0.0.1", serverPort);
//启动server(此过程放在最后,启动服务后,此方法下的不再执行)
new NettyServer().bind(serverPort);
}
}
11. 启动结果
可以看到成功在zookeeper上添加节点,CMD进入到zookeeper的bin目录下,执行zkCli.cmd,通过zookeeper客户端查看服务端添加节点的情况。执行(ls /),可以看到zookeeper的根节点有两个:zookeeper和rpc,其中rpc为Server端创建的根节点,执行(ls /rpc),可以看到根节点下已经添加了接口名的节点,执行(get /rpc/com.uv.api.UserService),查看节点信息,可以看到注册的Server端的ip+端口号。
github源码地址:https://github.com/UVliuwei/rpc