Thrift连接池,spring配置化,透明化调用(优化)
github项目地址:
由于看了dubbo 的代码,就在思考,thrift 也能通过 invoker 与 proxy 的方法将服务透明化,就与普通的bean 一样的调用方式
经过压力测试,连接池效果还是挺不错的
了解连接池的都知道,每次建立远程资源连接并进行请求,再断开连接,对系统资源的消耗是巨大的,所以每次请求thrift的server 并没有必要关闭io资源,可以建立连接池处理;
thrift 版本号 0.10.0
thrift 文件:
IThriftInfoTestService.thrift
IThriftTestService.thrift
IThriftInfoTestService.thrift
代码:
namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service
service IThriftInfoTestService {
string showInfoData(1:string name,2:bool b2,3:map<string,string> m2)
}
IThriftTestService.thrift
代码:
namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service
service IThriftTestService {
string showThriftResult(1:string name,2:bool b2,3:map<string,string> m2)
}
provider (服务端)
spring配置文件 spring-thrift.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
>
<!-- 两个thrift 服务的实现类 -->
<bean id="thriftInfoTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftInfoTestServiceImpl" ></bean>
<bean id="thriftTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftTestServiceImpl" ></bean>
<!-- Processor调用过程工厂,创建多服务类的工厂类 -->
<bean id="thriftProcessorFactory" class="com.java.core.rpc.thrift.supports.ThriftProcessorFactory"
init-method="convertTargetToTProcessor"
>
<property name="targets" >
<list >
<ref bean="thriftInfoTestServiceTarget"/>
<ref bean="thriftTestServiceTarget"/>
</list>
</property>
</bean>
<!-- thrift 的provider的启动类 -->
<bean class="com.java.core.rpc.thrift.provider.AppThriftServer" init-method="initThriftServer" ></bean>
</beans>
ThriftInfoTestServiceImpl.java
package com.java.core.rpc.thrift.service.impl;
import java.util.Map;
import org.apache.thrift.TException;
import com.alibaba.fastjson.JSONObject;
import com.java.core.rpc.thrift.service.IThriftInfoTestService;
public class ThriftInfoTestServiceImpl implements IThriftInfoTestService.Iface {
@Override
public String showInfoData(String name, boolean success, Map<String, String> map) throws TException {
// TODO Auto-generated method stub
System.out.println(" ThriftInfoTestServiceImpl doing ...showInfoData()... ");
System.out.println(" map : "+ JSONObject.toJSONString(map));
System.out.println(" success : "+ success);
System.out.println(" name : "+ name);
String result = name +" time : " + System.currentTimeMillis();
return result;
}
}
ThriftTestServiceImpl.java 代码就不贴了。都一样
ThriftProcessorFactory.java processor工厂类
public class ThriftProcessorFactory {
private final static String IFACE_NAME="$Iface";
private final static String PROCESS_NAME="$Processor";
private List<Object> targets;
private Map<String, TProcessor> processors;
private TMultiplexedProcessor multiplexedProcessor;
public TMultiplexedProcessor getMultiplexedProcessor() {
return multiplexedProcessor;
}
public void setMultiplexedProcessor(TMultiplexedProcessor multiplexedProcessor) {
this.multiplexedProcessor = multiplexedProcessor;
}
public ThriftProcessorFactory() {
super();
// TODO Auto-generated constructor stub
}
public Map<String, TProcessor> getProcessors() {
return processors;
}
public void setProcessors(Map<String, TProcessor> processors) {
this.processors = processors;
}
public List<Object> getTargets() {
return targets;
}
public void setTargets(List<Object> targets) {
this.targets = targets;
}
/**
* 将实现类封装成TProcessor类的集合
*
*/
public void convertTargetToTProcessor(){
if (targets.isEmpty()) {
return ;
}
processors = new HashMap<String, TProcessor>();
try {
for (Object target : targets ) {
Class iface= target.getClass().getInterfaces()[0];
String ifaceName =iface.getName();
String serviceName = ifaceName.substring(0, ifaceName.lastIndexOf(IFACE_NAME));
Class processorClazz = Class.forName(serviceName.concat(PROCESS_NAME));
Object processorObj = processorClazz.getConstructor(iface).newInstance(iface.cast(target));
if (processorObj instanceof TProcessor) {
TProcessor processor = (TProcessor) processorObj;
processors.put(serviceName, processor);
}
}
} catch (Exception e) {
e.printStackTrace();
}
initMultiplexedProcessor();
}
/**
* 初始化多服务调用过程 TMultiplexedProcessor
* 并且注册服务
*/
private void initMultiplexedProcessor(){
if (processors.isEmpty()) {
return ;
}
multiplexedProcessor = new TMultiplexedProcessor();
Set<String> serviceNames = processors.keySet();
for (String serviceName : serviceNames) {
if (!processors.containsKey(serviceName)) {
continue;
}
multiplexedProcessor.registerProcessor(serviceName, processors.get(serviceName));
}
}
}
AppThriftServer.java 重点,服务启动
public class AppThriftServer implements ApplicationContextAware {
/**线程池**/
private static ExecutorService executorService;
// ApplicationContextAware 可以调用spring 生命周期获取上下文
private static ApplicationContext context;
public AppThriftServer() {
super();
// TODO Auto-generated constructor stub
executorService = Executors.newSingleThreadExecutor();
}
public void initThriftServer(){
executorService.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(" ThriftServer start ing ....");
TNonblockingServerSocket transport = null;
try {
//非阻塞 ServerSocket
transport =new TNonblockingServerSocket(new InetSocketAddress(29999));
// 获取 TProcessor
ThriftProcessorFactory thriftProcessorFactory=context.getBean(ThriftProcessorFactory.class);
TProcessor processor =thriftProcessorFactory.getProcessor();
// nio selectorThreads处理io, workerThreads 处理服务调用过程
TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport);
args.processor(processor);
// customer也要TFramedTransport对应 否则报错
args.transportFactory(new TFramedTransport.Factory());
//二进制协议
args.protocolFactory(new TBinaryProtocol.Factory());
TThreadedSelectorServer server =new TThreadedSelectorServer(args);
server.serve();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (transport != null) {
transport.close();
}
}
}
});
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// TODO Auto-generated method stub
context=applicationContext;
}
}
接着启动项目,服务端便开始监听;
接着是 customer 客户端
调用主体
public class App
{
private static ApplicationContext applicationContext;
public static void main( String[] args )
{
System.out.println( "Hello World!" );
init();
final TestService testService=applicationContext.getBean(TestService.class);
// 回调方法
ITestListener testListener = new ITestListener() {
public void doTest() {
testService.doThriftInfoTest();
}
};
//测试方法
doThriftTest(1,testListener);
}
//并发线程,用来测试并发量
public static void doThriftTest (int threadCounts,final ITestListener testListener){
final Object waitObj = new Object();
for (int i=0 ;i< threadCounts ;i++) {
new Thread(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
synchronized (waitObj) {
try {
waitObj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (testListener != null) {
testListener.doTest();
}
}
}).start();
}
try {
Thread.currentThread().sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (waitObj) {
waitObj.notifyAll();
}
}
//测试方法回调
public interface ITestListener {
public void doTest();
}
public static void init(){
applicationContext = new FileSystemXmlApplicationContext("/resources/applicationContext.xml");
}
}
现在是直接在spring配置文件中配置thrift service的bean,调用代码跟一般的bean一模一样
TestService.java 代码:
public void doThriftTest(){
//运用动态代理 使thrift 接口透明化调用
// 与普通的spring bean 一样调用
IThriftTestService.Iface client = BeanHelper.getContext().getBean(IThriftTestService.Iface.class);
Map<String, String> map =new HashMap<String, String>();
map.put("name", "庄杰森");
map.put("IThriftTestService", "client");
map.put("content", "thrift 的 rpc 调用");
String name = "zhuangjiesen ...IThriftTestService doing...";
try {
client.showThriftResult(name, true, map);
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void doThriftInfoTest(){
//运用动态代理 使thrift 接口透明化调用
// 与普通的spring bean 一样调用
IThriftInfoTestService.Iface client = BeanHelper.getContext().getBean(IThriftInfoTestService.Iface.class);
Map<String, String> map =new HashMap<String, String>();
map.put("name", "庄杰森");
map.put("IThriftInfoTestService", "client");
map.put("content", "thrift 的 rpc 调用");
String name = "zhuangjiesen ...IThriftInfoTestService doing...";
try {
client.showInfoData(name, true, map);
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
thrift中 spring 的配置文件 :spring-thrift.xml
代码:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
>
<!-- thrift 连接池 -->
<bean id="thriftConnectionPool" class="com.java.core.rpc.thrift.supports.ThriftConnectionPool">
<property name="host" value="127.0.0.1"/>
<property name="port" value="29999"/>
<property name="maxConnections" value="20"/>
<property name="minConnections" value="5"/>
</bean>
<!-- 服务管理类 -->
<bean id="thriftServiceManager" class="com.java.core.rpc.thrift.supports.ThriftServiceManager" >
<property name="thriftConnectionPool" ref="thriftConnectionPool"></property>
</bean>
<!-- 通过实现 FactoryBean ,在进行动态代理,实现对服务的配置-->
<bean id="thriftTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
<property name="thriftServiceManager" ref="thriftServiceManager" ></property>
<property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftTestService.Iface" ></property>
</bean>
<bean id="thriftInfoTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
<property name="thriftServiceManager" ref="thriftServiceManager" ></property>
<property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftInfoTestService.Iface" ></property>
</bean>
</beans>
ThriftSpringFactoryBean.java 可以查找关于spring容器的 FactoryBean 接口用法 用法对照 org.springframework.aop.framework.ProxyFactoryBean
代码
public class ThriftSpringFactoryBean<T> implements FactoryBean<T> {
private ThriftServiceManager thriftServiceManager;
private Class serviceIfaceClass;
public T getObject() throws Exception {
return (T) thriftServiceManager.getThriftClient(serviceIfaceClass);
}
public Class<T> getObjectType() {
return serviceIfaceClass;
}
public boolean isSingleton() {
return true;
}
public ThriftServiceManager getThriftServiceManager() {
return thriftServiceManager;
}
public void setThriftServiceManager(ThriftServiceManager thriftServiceManager) {
this.thriftServiceManager = thriftServiceManager;
}
public Class getServiceIfaceClass() {
return serviceIfaceClass;
}
public void setServiceIfaceClass(Class serviceIfaceClass) {
this.serviceIfaceClass = serviceIfaceClass;
}
}
ThriftServiceManager 服务管理类
代码:
public class ThriftServiceManager {
private ThriftConnectionPool thriftConnectionPool;
// private final ConcurrentHashMap<String,Object> thriftClientCache = new ConcurrentHashMap();
public <T> T getThriftClient(Class<T> serviceIfaceClass){
if (!serviceIfaceClass.isInterface()) {
throw new RuntimeException("类型错误");
}
T client =null;
ThriftServiceProxyInvocation proxyInvocation =new ThriftServiceProxyInvocation();
// 代理接口类
proxyInvocation.setIfaceClazz(serviceIfaceClass);
// 设置连接池
proxyInvocation.setThriftConnectionPool(thriftConnectionPool);
// 获取远程服务代理类
client = (T) Proxy.newProxyInstance(serviceIfaceClass.getClassLoader(), new Class[]{ serviceIfaceClass }, proxyInvocation);
return client;
}
public ThriftConnectionPool getThriftConnectionPool() {
return thriftConnectionPool;
}
public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
this.thriftConnectionPool = thriftConnectionPool;
}
}
ThriftServiceProxyInvocation.java 动态代理类,获取连接并调用远程方法
代码:
public class ThriftServiceProxyInvocation implements InvocationHandler {
/*thrift 服务类的iface 类*/
private Class ifaceClazz;
/* thrift 连接池*/
private ThriftConnectionPool thriftConnectionPool;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// TODO Auto-generated method stub
System.out.println(" ThriftServiceProxyInvocation invoke doing before ....");
if (ifaceClazz == null) {
return null;
}
Object result = null;
try {
String serviceIfaceClassName = ifaceClazz.getName();
String serviceClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,"");
String serviceClientClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,ThriftConstant.CLIENT_NAME);
Class clientClazz = Class.forName(serviceClientClassName);
// 连接池中选择 protocol
TProtocol protocol = thriftConnectionPool.getProtocol(serviceClassName);
Object clientInstance= clientClazz.getConstructor(TProtocol.class).newInstance(protocol);
result=method.invoke(clientInstance, args);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
} finally {
// 回收 protocol
thriftConnectionPool.recycleProtocol();
}
System.out.println(" ThriftServiceProxyInvocation invoke doing after ....");
return result;
}
public Class getIfaceClazz() {
return ifaceClazz;
}
public void setIfaceClazz(Class ifaceClazz) {
this.ifaceClazz = ifaceClazz;
}
public ThriftConnectionPool getThriftConnectionPool() {
return thriftConnectionPool;
}
public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
this.thriftConnectionPool = thriftConnectionPool;
}
}
接下来就是重点 thrift 连接池 ThriftConnectionPool.java
代码:
重点方法:getProtocolInternal() 获取连接
public class ThriftConnectionPool implements InitializingBean {
private String host;
private int port;
private int minConnections = 5;
private int maxConnections = 10;
private int connectionsCount = 0;
private int waitQueueSeconds = 10 ;
private int recycleSeconds=10;
// TProtocol 连接
private LinkedBlockingQueue<TProtocol> blockingQueue;
private LinkedBlockingQueue<Thread> waitingThreadBlockingQueue ;
private ThreadLocal<TProtocol> protocolLocal = new ThreadLocal<TProtocol>();
//回收线程
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);;
public ThriftConnectionPool() {
}
//初始化连接池
public synchronized void initThriftConnectionPool(){
blockingQueue = new LinkedBlockingQueue<TProtocol>();
//初始化线程排队队列
waitingThreadBlockingQueue = new LinkedBlockingQueue<Thread>();
for (int i = 0 ; i < minConnections ; i++) {
blockingQueue.add(createNewProtocol());
}
//回收线程
scheduledExecutorService.schedule(new Runnable() {
public void run() {
reducePool();
}
},recycleSeconds, TimeUnit.SECONDS);
}
//创建协议
public synchronized TProtocol createNewProtocol(){
TProtocol protocol = null;
if (connectionsCount < maxConnections) {
try {
TSocket socket = new TSocket(host,port);
TFramedTransport framedTransport = new TFramedTransport(socket);
TBinaryProtocol binaryProtocol = new TBinaryProtocol(framedTransport);
binaryProtocol.getTransport().open();
protocol = binaryProtocol;
connectionsCount ++ ;
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
return protocol;
}
//从连接池中获取Protocol
public TProtocol getProtocolInternal(){
protocolLocal.remove();
TProtocol protocol = null;
protocol = blockingQueue.poll();
if (protocol == null) {
protocol = createNewProtocol();
if (protocol == null) {
waitingThreadBlockingQueue.add(Thread.currentThread());
while ( waitingThreadBlockingQueue.peek().getId() != Thread.currentThread().getId() ) {
//当前线程排在最前面
if (waitingThreadBlockingQueue.peek().getState() == Thread.State.TERMINATED) {
//队列最前的线程已经死了
waitingThreadBlockingQueue.poll();
}
}
try {
protocol = blockingQueue.poll(waitQueueSeconds,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
waitingThreadBlockingQueue.poll();
}
}
// waitingThreadBlockingQueue
} else if (protocol != null && (!protocol.getTransport().isOpen())) {
//取到 protocol 但是已经关闭 重新创建
protocol = createNewProtocol();
}
protocolLocal.set(protocol);
return protocol;
}
public TProtocol getProtocol(String serviceName){
TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(getProtocolInternal(),serviceName);
return multiplexedProtocol;
}
public void recycleProtocol(){
TProtocol protocol = protocolLocal.get();
if (protocol != null) {
if (protocol.getTransport() != null && (!protocol.getTransport().isOpen())) {
protocol = createNewProtocol();
}
blockingQueue.add(protocol);
}
protocolLocal.remove();
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getMinConnections() {
return minConnections;
}
public void setMinConnections(int minConnections) {
this.minConnections = minConnections;
}
public int getMaxConnections() {
return maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
public int getWaitQueueSeconds() {
return waitQueueSeconds;
}
public void setWaitQueueSeconds(int waitQueueSeconds) {
this.waitQueueSeconds = waitQueueSeconds;
}
public void afterPropertiesSet() throws Exception {
initThriftConnectionPool();
}
@Override
protected void finalize() throws Throwable {
destroy();
super.finalize();
}
//关闭连接
public synchronized void destroy(){
try {
TProtocol protocol = blockingQueue.take();
while (protocol != null) {
if (protocol.getTransport().isOpen()) {
protocol.getTransport().close();
}
protocol = blockingQueue.take();
connectionsCount --;
}
} catch (Exception e) {
e.printStackTrace();
}
}
private synchronized void reducePool(){
if (connectionsCount > minConnections) {
int connectionsCountTemp = connectionsCount;
TProtocol protocol = null;
for (int i = connectionsCountTemp ; i > minConnections ; i--) {
try {
protocol = blockingQueue.poll(waitQueueSeconds,TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
//关闭连接
if (protocol != null && protocol.getTransport() != null && (protocol.getTransport().isOpen())){
protocol.getTransport().close();
}
}
}
}
}