一、问题描述                                                                                        

    在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。

  • 服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决
  • 服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法
  • 服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求   

二、实现思路                                                                                        

    1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)

        thrift server端可以单机多端口多实例或多机部署多实例方式运行。

   2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。

   3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。

                 由浅入深了解Thrift之客户端连接池化

三、具体实现                                                                                       

   1、thrift server端   

      thrift server端,向zk中注册server address

package com.wy.thriftpool.commzkpool;

import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor;

import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.springframework.beans.factory.InitializingBean;

import com.wy.thrift.service.UserService.Processor;
import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;
import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;

/**
 * thrift server端,向zk中注册server address
 * 
 * @author wy
 * 
 */
public class ThriftServiceServerFactory implements InitializingBean {

    // thrift server 服务端口
    private Integer port;
    // default 权重
    private Integer priority = 1;
    // service实现类
    private Object service;
    // thrift server 注册路径
    private String configPath;

    private ThriftServerIpTransfer ipTransfer;
    // thrift server注册类
    private ThriftServerAddressReporter addressReporter;
    // thrift server开启服务
    private ServerThread serverThread;

    @Override
    public void afterPropertiesSet() throws Exception {
        if (ipTransfer == null) {
            ipTransfer = new LocalNetworkIpTransfer();
        }
        String ip = ipTransfer.getIp();
        if (ip == null) {
            throw new NullPointerException("cant find server ip...");
        }
        String hostname = ip + ":" + port + ":" + priority;
        Class<? extends Object> serviceClass = service.getClass();
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }

        // reflect,load "Processor";
        Processor<?> processor = null;
        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            String pname = clazz.getEnclosingClass().getName() + "$Processor";
            try {
                Class<?> pclass = classLoader.loadClass(pname);
                if (!pclass.isAssignableFrom(Processor.class)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (Processor<?>) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                // TODO
            }
        }

        if (processor == null) {
            throw new IllegalClassFormatException("service-class should implements Iface");
        }
        // 需要单独的线程,因为serve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // report
        if (addressReporter != null) {
            addressReporter.report(configPath, hostname);
        }
    }

    class ServerThread extends Thread {
        private TServer server;

        ServerThread(Processor<?> processor, int port) throws Exception {
            // 设置传输通道
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            // 设置二进制协议
            Factory protocolFactory = new TBinaryProtocol.Factory();
            
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
            tArgs.processor(processor);
            tArgs.transportFactory(new TFramedTransport.Factory());
            tArgs.protocolFactory(protocolFactory);
            int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
            tArgs.selectorThreads(num);
            tArgs.workerThreads(num * 10);
            
            // 网络服务模型
            server = new TThreadedSelectorServer(tArgs);
            
        }

        @Override
        public void run() {
            try {
                server.serve();
            } catch (Exception e) {
                //TODO
            }
        }

        public void stopServer() {
            server.stop();
        }
    }

    public void close() {
        serverThread.stopServer();
    }

    public void setService(Object service) {
        this.service = service;
    }

    public void setPriority(Integer priority) {
        this.priority = priority;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
        this.ipTransfer = ipTransfer;
    }

    public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
        this.addressReporter = addressReporter;
    }

    public void setConfigPath(String configPath) {
        this.configPath = configPath;
    }
}
View Code

相关文章:

  • 2022-12-23
  • 2021-05-24
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-06-26
  • 2019-12-14
  • 2021-08-14
猜你喜欢
  • 2022-12-23
  • 2021-11-05
  • 2022-12-23
  • 2022-12-23
  • 2021-11-13
  • 2021-06-10
  • 2021-05-22
相关资源
相似解决方案