前文《由浅入深了解Thrift之客户端连接池化》中我们已经实现了服务调用端 连接的池化,实现的过于简陋,离实际的项目运用还很遥远。本文将在进一步改造,主要是两方面:1、服务端如何注册多个服务 2、调用端如何获取服务对象而不是服务连接
一、实现思路
1、通过spring配置文件,配置服务类
2、反射生成服务类实例,依次注册服务
调用端获取服务对象亦是如此,废话不多说了
二、主要实现
1、服务端
/** * thrift server端,向zk中注册server address * * @author wy * */ public class ThriftServiceServerFactory implements InitializingBean { private final Logger logger = LoggerFactory.getLogger(getClass()); // thrift server 服务端口 private Integer port; // default 权重 private Integer priority = 1; // service 实现类 private Map<String, Object> services; // thrift server 注册路径 private String configPath; // thrift service module private String configService; private ThriftServerIpTransfer ipTransfer; // thrift server注册类 private ThriftServerAddressReporter addressReporter; // thrift server开启服务 private ServerThread serverThread; @Override public void afterPropertiesSet() throws Exception { if (ipTransfer == null) { ipTransfer = new LocalNetworkIpTransfer(); } String ip = ipTransfer.getIp(); if (ip == null) { throw new NullPointerException("cant find server ip..."); } String hostName = ip + ":" + port + ":" + priority; logger.info("registry services address = " + hostName); logger.info("registry services count = " + services.size()); Set<Entry<String, Object>> set = services.entrySet(); Iterator<Entry<String, Object>> iter = set.iterator(); Map<String, TProcessor> processors = new HashMap<String, TProcessor>(); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); for (; iter.hasNext();) { Entry<String, Object> entry = iter.next(); // 通过反射获取service interface String servieName = entry.getKey(); Object serviceImplObject = entry.getValue(); Class<?> serviceClass = serviceImplObject.getClass(); // 返回本类直接实现的接口.不包含泛型参数信息 Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } TProcessor processor = null; for (Class<?> interfaceClazz : interfaces) { logger.info("service Interfaces count = " + interfaces.length); // 获取源代码中给出的'底层类'简称 String cname = interfaceClazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } // 获取外部类Class;以String的形式,返回Class对象的'实体'名称 String pname = interfaceClazz.getEnclosingClass().getName() + "$Processor"; Class<?> pclass = classLoader.loadClass(pname); // class是否相同或是另一个类的超类或接口 if (!pclass.isAssignableFrom(Processor.class)) { continue; } Constructor<?> constructor = pclass.getConstructor(interfaceClazz); processor = (TProcessor) constructor.newInstance(serviceImplObject); processors.put(servieName, processor); break; } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } } // 需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processors, port); serverThread.start(); // 向ZK中注册服务 if (addressReporter != null) { addressReporter.report(configPath, hostName); } } class ServerThread extends Thread { private TServer server; ServerThread(Map<String, TProcessor> processors, int port) throws Exception { // 设置传输通道 TNonblockingServerSocket serverTransport = new TNonblockingServerSocket( port); // 设置二进制协议 Factory protocolFactory = new TBinaryProtocol.Factory(); // 网络服务模型 TThreadedSelectorServer.Args tssArgs = new TThreadedSelectorServer.Args( serverTransport); // TMultiplexedProcessor processor = new TMultiplexedProcessor(); Set<Entry<String, TProcessor>> set = processors.entrySet(); Iterator<Entry<String, TProcessor>> iter = set.iterator(); // for (; iter.hasNext();) { Entry<String, TProcessor> entry = iter.next(); processor.registerProcessor(entry.getKey(), (TProcessor) entry.getValue()); tssArgs.processor(processor); } tssArgs.protocolFactory(protocolFactory); // tssArgs.transportFactory(new TFramedTransport.Factory()); int num = Runtime.getRuntime().availableProcessors() * 2 + 1; tssArgs.selectorThreads(num); tssArgs.workerThreads(num * 10); server = new TThreadedSelectorServer(tssArgs); } @Override public void run() { try { server.serve(); } catch (Exception e) { e.printStackTrace(); } } // 关闭thrift server public void stopServer() { if (server != null && !server.isServing()) { server.stop(); } } } // 关闭thrift server public void close() { serverThread.stopServer(); } public void setServices(Map<String, Object> services) { this.services = services; } public void setPriority(Integer priority) { this.priority = priority; } public void setPort(Integer port) { this.port = port; } public void setIpTransfer(ThriftServerIpTransfer ipTransfer) { this.ipTransfer = ipTransfer; } public void setAddressReporter(ThriftServerAddressReporter addressReporter) { this.addressReporter = addressReporter; } public void setConfigPath(String configPath) { this.configPath = configPath; } public String getConfigService() { return configService; } public void setConfigService(String configService) { this.configService = configService; } }