前文《由浅入深了解Thrift之客户端连接池化》中我们已经实现了服务调用端 连接的池化,实现的过于简陋,离实际的项目运用还很遥远。本文将在进一步改造,主要是两方面:1、服务端如何注册多个服务 2、调用端如何获取服务对象而不是服务连接

一、实现思路

    1、通过spring配置文件,配置服务类

    2、反射生成服务类实例,依次注册服务

    调用端获取服务对象亦是如此,废话不多说了

二、主要实现

   1、服务端

/**
 * thrift server端,向zk中注册server address
 * 
 * @author wy
 * 
 */
public class ThriftServiceServerFactory implements InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(getClass());

    // thrift server 服务端口
    private Integer port;
    // default 权重
    private Integer priority = 1;
    // service 实现类
    private Map<String, Object> services;
    // thrift server 注册路径
    private String configPath;
    // thrift service module
    private String configService;

    private ThriftServerIpTransfer ipTransfer;
    // thrift server注册类
    private ThriftServerAddressReporter addressReporter;
    // thrift server开启服务
    private ServerThread serverThread;

    @Override
    public void afterPropertiesSet() throws Exception {
        if (ipTransfer == null) {
            ipTransfer = new LocalNetworkIpTransfer();
        }
        String ip = ipTransfer.getIp();
        if (ip == null) {
            throw new NullPointerException("cant find server ip...");
        }
        String hostName = ip + ":" + port + ":" + priority;
        logger.info("registry services address = " + hostName);
        logger.info("registry services count = " + services.size());
        Set<Entry<String, Object>> set = services.entrySet();
        Iterator<Entry<String, Object>> iter = set.iterator();
        Map<String, TProcessor> processors = new HashMap<String, TProcessor>();
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        for (; iter.hasNext();) {
            Entry<String, Object> entry = iter.next();
            // 通过反射获取service interface
            String servieName = entry.getKey();
            Object serviceImplObject = entry.getValue();
            Class<?> serviceClass = serviceImplObject.getClass();
            // 返回本类直接实现的接口.不包含泛型参数信息
            Class<?>[] interfaces = serviceClass.getInterfaces();
            if (interfaces.length == 0) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            TProcessor processor = null;
            for (Class<?> interfaceClazz : interfaces) {
                logger.info("service Interfaces count = " + interfaces.length);
                // 获取源代码中给出的'底层类'简称
                String cname = interfaceClazz.getSimpleName();
                if (!cname.equals("Iface")) {
                    continue;
                }
                // 获取外部类Class;以String的形式,返回Class对象的'实体'名称
                String pname = interfaceClazz.getEnclosingClass().getName() + "$Processor";
                Class<?> pclass = classLoader.loadClass(pname);
                // class是否相同或是另一个类的超类或接口
                if (!pclass.isAssignableFrom(Processor.class)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(interfaceClazz);
                processor = (TProcessor) constructor.newInstance(serviceImplObject);
                processors.put(servieName, processor);
                break;
            }
            if (processor == null) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            
        }

        // 需要单独的线程,因为serve方法是阻塞的.
        serverThread = new ServerThread(processors, port);
        serverThread.start();
        // 向ZK中注册服务
        if (addressReporter != null) {
            addressReporter.report(configPath, hostName);
        }
    }

    class ServerThread extends Thread {
        private TServer server;

        ServerThread(Map<String, TProcessor> processors, int port) throws Exception {
            // 设置传输通道
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(
                    port);
            // 设置二进制协议
            Factory protocolFactory = new TBinaryProtocol.Factory();
            // 网络服务模型
            TThreadedSelectorServer.Args tssArgs = new TThreadedSelectorServer.Args(
                    serverTransport);
            //
            TMultiplexedProcessor processor = new TMultiplexedProcessor();

            Set<Entry<String, TProcessor>> set = processors.entrySet();
            Iterator<Entry<String, TProcessor>> iter = set.iterator();
            //
            for (; iter.hasNext();) {
                Entry<String, TProcessor> entry = iter.next();
                processor.registerProcessor(entry.getKey(),
                        (TProcessor) entry.getValue());
                tssArgs.processor(processor);
            }

            tssArgs.protocolFactory(protocolFactory);
            //
            tssArgs.transportFactory(new TFramedTransport.Factory());
            int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
            tssArgs.selectorThreads(num);
            tssArgs.workerThreads(num * 10);

            server = new TThreadedSelectorServer(tssArgs);

        }

        @Override
        public void run() {
            try {
                server.serve();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        // 关闭thrift server
        public void stopServer() {
            if (server != null && !server.isServing()) {
                server.stop();
            }
        }
    }

    // 关闭thrift server
    public void close() {
        serverThread.stopServer();
    }

    public void setServices(Map<String, Object> services) {
        this.services = services;
    }

    public void setPriority(Integer priority) {
        this.priority = priority;
    }

    public void setPort(Integer port) {
        this.port = port;
    }

    public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
        this.ipTransfer = ipTransfer;
    }

    public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
        this.addressReporter = addressReporter;
    }

    public void setConfigPath(String configPath) {
        this.configPath = configPath;
    }

    public String getConfigService() {
        return configService;
    }

    public void setConfigService(String configService) {
        this.configService = configService;
    }

}
View Code

相关文章:

  • 2022-12-23
  • 2021-05-24
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-06-26
  • 2019-12-14
  • 2021-08-14
猜你喜欢
  • 2022-12-23
  • 2021-11-05
  • 2022-12-23
  • 2022-12-23
  • 2021-11-13
  • 2021-06-10
  • 2021-05-22
相关资源
相似解决方案