RPC框架

      RPC是Remote Procedure Call的缩写,像Client-Servier一样的远程过程调用,也就是调用远程服务就跟调用本地服务一样方便,一般用于将程序部署在不同的机器上,供客户端进行调用。就像一个request-response调用系统一样简单。

      RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

RPC实现

     本次实例,通信使用Netty,注册中心使用zookeeper。由服务端向zookeeper注册要暴露的接口(包括接口所在的服务器的ip和端口,相当于key为接口名,value为ip+端口号),启动NettyServer服务,等待NettyClient的连接;客户端使用动态代理的方式,在接口调用时,获取到方法的调用详情,根据调用的接口名,通过zookeeper获取到接口所在的服务器的ip+端口号,NeetyClient连接到NettyServer,然后将方法的调用详情通过Netty通信发送到NettyServer上,由Server使用Java反射调用响应接口的实现类获取到返回值,并将发返回值发送回NettyClient,再通过动态代理将返回值返回给方法的调用者。

简单实现RPC框架(一)服务端的实现

     1. 添加依赖

    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.8</version>
    </dependency>
    <dependency>
      <groupId>org.reflections</groupId>
      <artifactId>reflections</artifactId>
      <version>0.9.11</version>
    </dependency>
    <dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.10.Final</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.38</version>
    </dependency>

     2. 暴露的接口和接口实现类

/*
 * @author uv
 * @date 2018/10/12 8:56
 * 为了方便,这里就不把接口打成jar包调用了
 */
public interface UserService {

    String sayHello(String name);

}

 

/*
 * @author uv
 * @date 2018/10/12 8:56
 * 接口实现类
 */

import com.uv.rpc.annotation.RpcService;
import com.uv.api.UserService;

@RpcService(UserService.class)
public class UserServceImpl implements UserService{

    @Override
    public String sayHello(String name) {
        return "Hello " + name;
    }
}

     3.  zookeeper连接类,用于服务启动时连接到zookeeper,并在zookeeper添加根节点/rpc/,并在跟节点下添加暴露接口的子节点

/*
 * @author uv
 * @date 2018/10/11 15:56
 * zookeeper 工具类(服务发现)
 * zookeeper存储结构为目录层级结构
 */

import java.io.IOException;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZookeeperUtil {

    //zookeeper连接超时时间
    private static final int SESSION_TIMEOUT = 5000;
    //在zookeeper注册的根节点
    private static final String ROOT = "/rpc";

    private static ZooKeeper zooKeeper = null;

    /**
     * @param ip zookeeper服务的IP地址
     * @param port 端口号 连接zookeeper服务
     * 连接zookeeper
     */
    public static void connect(String ip, int port) {
        try {
            zooKeeper = new ZooKeeper(ip + ":" + port, SESSION_TIMEOUT, null);
        } catch (IOException e) {
            System.out.println("zookeeper连接失败");
            e.printStackTrace();
        }
    }

    /**
     * @param ip provider服务的ip
     * 在zookeeper中注册暴漏的接口
     */
    public static void register(String ip, int port) {

        if(zooKeeper == null) {
            throw new RuntimeException("zookeeper未连接");
        }
        try {
            List<String> interfaceList = ScanUtil.interfaceList;
            if (interfaceList.size() == 0) {
                System.out.println("没有可暴露的接口");
                return;
            }
            /**
             * CreateMode:
             *     PERSISTENT (持续的,相对于EPHEMERAL,不会随着client的断开而消失)
             *     PERSISTENT_SEQUENTIAL(持久的且带顺序的)
             *     EPHEMERAL (临时的,生命周期依赖于client session)
             *     EPHEMERAL_SEQUENTIAL  (临时的,带顺序的)
             */
            //如果根节点不存在,创建永久(PERSISTENT)根节点,临时节点下不能创建子节点
            if (zooKeeper.exists(ROOT, true) == null) {
                zooKeeper.create(ROOT, "rpc".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
            for (String intf : interfaceList) {
                String path = ROOT + "/" + intf;
                Stat exists = zooKeeper.exists(path, true);
                //节点不存在,创建临时(EPHEMERAL)节点
                if (exists == null) {
                    zooKeeper.create(ROOT + "/" + intf, (ip + ":" + port).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } else { //修改节点,如果 version 为 -1 则可以匹配任何版本
                    zooKeeper.setData(ROOT+ "/" + intf, (ip + ":" + port).getBytes(), -1);
                }
                System.out.println("zookeeper创建节点:" + intf + "成功");
            }
        } catch (Exception e) {
            System.out.println("zookeeper创建节点失败");
            e.printStackTrace();
        }
    }
}

    3. 采用注解的方式暴露接口,用在暴露接口实现类的注解@RpcService,value指向暴露的接口

/*
 * @author uv
 * @date 2018/10/11 18:31
 * 暴露的接口
 */

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcService {
    //实现类实现的暴露的接口
    Class<?> value();
}

    4. 扫描带有@RpcService注解的类(即需要暴露的接口和接口的实现类,并形成对应关系并保存,在连接到zookeeper时,将interfaceList中的接口添加到zookeeper节点上;interfaceMap在NettyClient发送过接口的请求时,根据接口名,获取到杜对应的实现类) 

/*
 * @author uv
 * @date 2018/10/11 18:42
 * 扫描注解
 */

import com.uv.rpc.annotation.RpcService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.reflections.Reflections;

public class ScanUtil {

    //暴露服务的接口
    public static List<String> interfaceList = new ArrayList<>();
    //暴露接口和实现类的映射关系(key=接口,value=接口实现类)
    public static Map<String, Class<?>> interfaceClassMap = new ConcurrentHashMap<>();

    //扫描类
    public static void scanPath(String path) {
        Reflections reflections = new Reflections(path);
        //扫描带有@RpcService注解的类
        Set<Class<?>> annotated = reflections.getTypesAnnotatedWith(RpcService.class);
        for (Class<?> clazz : annotated) {
            RpcService rpcService = clazz.getAnnotation(RpcService.class);
            //暴露服务的接口
            interfaceList.add(rpcService.value().getName());
            //暴露接口和实现类的映射关系
            interfaceClassMap.put(rpcService.value().getName(), clazz);
        }
    }
}

    5.  NettyServer的实现(关于Netty的用法,可以点这里:https://blog.csdn.net/qq_22200097/article/details/83042424

/*
 * @author uv
 * @date 2018/10/12 18:25
 * 服务端
 */

import com.uv.rpc.netty.protocol.RpcDecoder;
import com.uv.rpc.netty.protocol.RpcEncoder;
import com.uv.rpc.netty.protocol.RpcRequest;
import com.uv.rpc.netty.protocol.RpcResponse;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public void bind(int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件

        ServerBootstrap sb = new ServerBootstrap();
        sb.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128
            .childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接
            .childHandler(new ChannelInitializer<SocketChannel>() {  // 绑定客户端连接时候触发操作
                @Override
                protected void initChannel(SocketChannel sh) throws Exception {
                    sh.pipeline()
                        .addLast(new RpcDecoder(RpcRequest.class)) //解码request
                        .addLast(new RpcEncoder(RpcResponse.class)) //编码response
                        .addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息
                }
            });
        //绑定监听端口,调用sync同步阻塞方法等待绑定操作完成,完成后返回ChannelFuture类似于JDK中Future
        try {
            ChannelFuture future = sb.bind(port).sync();

            if (future.isSuccess()) {
                System.out.println("服务端启动成功");
            } else {
                System.out.println("服务端启动失败");
                future.cause().printStackTrace();
                bossGroup.shutdownGracefully(); //关闭线程组
                workerGroup.shutdownGracefully();
            }

            //成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            System.out.println("服务端启动失败");
            e.printStackTrace();
            System.exit(0);
        }
    }
}

     6.  ServerHandler,服务端消息处理类,用于处理客户端发送来的请求,通过反射调用接口实现类,返回客户端返回值

/*
 * @author uv
 * @date 2018/10/12 18:33
 * 处理服务端接收的数据
 */

import com.uv.rpc.discovery.ScanUtil;
import com.uv.rpc.netty.protocol.RpcRequest;
import com.uv.rpc.netty.protocol.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;

public class ServerHandler extends ChannelInboundHandlerAdapter{


    //接受client发送的消息并处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        RpcRequest request = (RpcRequest) msg;
        //返回的数据结构
        RpcResponse response = new RpcResponse();
        response.setId(request.getId()); //返回ID与request对应
        try {
            //执行相应的方法
            Object result = invokeMethod(request);
            response.setData(result);
            response.setStatus(1);
        } catch (Exception e) {
            e.printStackTrace();
            response.setData(null);
            response.setStatus(-1);
        }
        //返回执行结果
        ctx.writeAndFlush(response);
    }

    //通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    //读操作时捕获到异常时调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

    //客户端去和服务端连接成功时触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush("hello client");
    }

    //根据请求,通过Java反射的方式执行相应的方法
    private Object invokeMethod(RpcRequest request) throws Exception{

        String className = request.getClassName();
        //根据暴露的接口,找到实现类
        Class<?> clazz = ScanUtil.interfaceClassMap.get(className);

        //找到要执行的方法
        Method method = clazz.getDeclaredMethod(request.getMethodName(), request.getParameterTypes());
        //执行
        Object result = method.invoke(clazz.newInstance(), request.getParameters());
        return result;
    }
}

     7.  Netty传输时的实体类,包括要返回的数据,执行状态。(request包括远程调用接口的方法的详细信息,方便服务端进行Java反射)

/*
 * @author uv
 * @date 2018/10/13 18:10
 * 传输请求对象
 */

import java.io.Serializable;

public class RpcRequest implements Serializable{

    private static final long serialVersionUID = -4558182507809817096L;

    private String id; //消息ID
    private String className; //远程调用的类
    private String methodName; //远程调用的方法
    private Class<?>[] parameterTypes; //方法参数类型列表
    private Object[] parameters; //方法参数

    public String getId() {
        return id;
    }

    public RpcRequest setId(String id) {
        this.id = id;
        return this;
    }

    public String getClassName() {
        return className;
    }

    public RpcRequest setClassName(String className) {
        this.className = className;
        return this;
    }

    public String getMethodName() {
        return methodName;
    }

    public RpcRequest setMethodName(String methodName) {
        this.methodName = methodName;
        return this;
    }

    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public RpcRequest setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
        return this;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public RpcRequest setParameters(Object[] parameters) {
        this.parameters = parameters;
        return this;
    }
}
/*
 * @author uv
 * @date 2018/10/13 18:10
 * 传输响应对象
 */

import java.io.Serializable;

public class RpcResponse implements Serializable{

    private static final long serialVersionUID = 1147496058100401100L;

    private String id; //消息体ID
    private Object data; //方法执行返回值
    // 0=success -1=fail
    private int status; //执行状态

    public String getId() {
        return id;
    }

    public RpcResponse setId(String id) {
        this.id = id;
        return this;
    }

    public Object getData() {
        return data;
    }

    public RpcResponse setData(Object data) {
        this.data = data;
        return this;
    }

    public int getStatus() {
        return status;
    }

    public RpcResponse setStatus(int status) {
        this.status = status;
        return this;
    }
}

     8.  实体类在传输时,需要进行编码和解码(序列化)才能进行通信,此处定义通讯时的编码器可解码器,此处使用fastJson进行序列化和反序列化,可采用其它方式的进行

/*
 * @author uv
 * @date 2018/10/13 18:09
 * 编码器(将实体类转换成可传输的数据)
 */

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class RpcEncoder extends MessageToByteEncoder {

    //目标对象类型进行编码
    private Class<?> target;

    public RpcEncoder(Class target) {
        this.target = target;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
        if (target.isInstance(msg)) {
            byte[] data = JSON.toJSONBytes(msg); //使用fastJson将对象转换为byte
            out.writeInt(data.length); //先将消息长度写入,也就是消息头
            out.writeBytes(data); //消息体中包含我们要发送的数据
        }
    }
}
/*
 * @author uv
 * @date 2018/10/13 18:09
 * 解码器(将接收的数据转换成实体类)
 */

import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;

public class RpcDecoder extends ByteToMessageDecoder {

    //目标对象类型进行解码
    private Class<?> target;

    public RpcDecoder(Class target) {
        this.target = target;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) { //不够长度丢弃
            return;
        }
        in.markReaderIndex(); //标记一下当前的readIndex的位置
        int dataLength = in.readInt(); // 读取传送过来的消息的长度。ByteBuf 的readInt()方法会让他的readIndex增加4

        if (in.readableBytes() < dataLength) { //读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex. 这个配合markReaderIndex使用的。把readIndex重置到mark的地方
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = JSON.parseObject(data, target); //将byte数据转化为我们需要的对象
        out.add(obj);
    }
}

   9. 使用windows版本的zookeeper,进入到bin目录下,执行zkServer.cmd启动

   10. Server端启动

import com.uv.rpc.discovery.ScanUtil;
import com.uv.rpc.discovery.ZookeeperUtil;
import com.uv.rpc.netty.server.NettyServer;

/**
 * <uv> [2018/10/14 16:55]
 */
public class Main {

    //server启动监听的端口
    public static final int serverPort = 8018;

    public static void main(String[] args) {

        //扫描暴露的接口
        ScanUtil.scanPath("com.uv.api");
        //连接zookeeper
        ZookeeperUtil.connect("127.0.0.1", 2181);
        //向zookeeper注册暴露的接口
        ZookeeperUtil.register("127.0.0.1", serverPort);
        //启动server(此过程放在最后,启动服务后,此方法下的不再执行)
        new NettyServer().bind(serverPort);
    }
}

    11. 启动结果 

简单实现RPC框架(一)服务端的实现

   可以看到成功在zookeeper上添加节点,CMD进入到zookeeper的bin目录下,执行zkCli.cmd,通过zookeeper客户端查看服务端添加节点的情况。执行(ls /),可以看到zookeeper的根节点有两个:zookeeper和rpc,其中rpc为Server端创建的根节点,执行(ls /rpc),可以看到根节点下已经添加了接口名的节点,执行(get /rpc/com.uv.api.UserService),查看节点信息,可以看到注册的Server端的ip+端口号。

简单实现RPC框架(一)服务端的实现 

github源码地址:https://github.com/UVliuwei/rpc

相关文章: