Thrift 连接池,spring 配置化,透明化调用(优化)

1,901 阅读7分钟

Thrift连接池,spring配置化,透明化调用(优化)

github项目地址:

github项目地址

由于看了dubbo 的代码,就在思考,thrift 也能通过 invoker 与 proxy 的方法将服务透明化,就与普通的bean 一样的调用方式

经过压力测试,连接池效果还是挺不错的

了解连接池的都知道,每次建立远程资源连接并进行请求,再断开连接,对系统资源的消耗是巨大的,所以每次请求thrift的server 并没有必要关闭io资源,可以建立连接池处理;

thrift 版本号 0.10.0

thrift 文件:
IThriftInfoTestService.thrift
IThriftTestService.thrift
IThriftInfoTestService.thrift

代码:

namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service

service  IThriftInfoTestService {
  string showInfoData(1:string name,2:bool b2,3:map<string,string> m2)
}
IThriftTestService.thrift

代码:

namespace java com.java.thrift.service
namespace cpp com.java.thrift.service
namespace perl com.java.thrift.service
namespace php com.java.thrift.service

service  IThriftTestService {
  string showThriftResult(1:string name,2:bool b2,3:map<string,string> m2)
}

provider (服务端)

spring配置文件 spring-thrift.xml


<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:mvc="http://www.springframework.org/schema/mvc" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd    
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd  
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
>

    <!-- 两个thrift 服务的实现类 -->
    <bean id="thriftInfoTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftInfoTestServiceImpl" ></bean>
    <bean id="thriftTestServiceTarget" class="com.java.core.rpc.thrift.service.impl.ThriftTestServiceImpl" ></bean>



    <!-- Processor调用过程工厂,创建多服务类的工厂类  -->
    <bean id="thriftProcessorFactory"  class="com.java.core.rpc.thrift.supports.ThriftProcessorFactory" 
        init-method="convertTargetToTProcessor"
        >
        <property name="targets" >
            <list >
                <ref bean="thriftInfoTestServiceTarget"/>
                <ref bean="thriftTestServiceTarget"/>
            </list>
        </property>

    </bean>

    <!--  thrift 的provider的启动类 -->
    <bean class="com.java.core.rpc.thrift.provider.AppThriftServer" init-method="initThriftServer"  ></bean>


</beans>

ThriftInfoTestServiceImpl.java

package com.java.core.rpc.thrift.service.impl;



import java.util.Map;

import org.apache.thrift.TException;

import com.alibaba.fastjson.JSONObject;
import com.java.core.rpc.thrift.service.IThriftInfoTestService;

public class ThriftInfoTestServiceImpl implements IThriftInfoTestService.Iface {

    @Override
    public String showInfoData(String name, boolean success, Map<String, String> map) throws TException {
        // TODO Auto-generated method stub



        System.out.println(" ThriftInfoTestServiceImpl doing ...showInfoData()... ");
        System.out.println(" map : "+ JSONObject.toJSONString(map));
        System.out.println(" success : "+ success);
        System.out.println(" name : "+ name);
        String result = name +" time : " + System.currentTimeMillis();

        return result;
    }

}

ThriftTestServiceImpl.java 代码就不贴了。都一样
ThriftProcessorFactory.java processor工厂类

public class ThriftProcessorFactory {

    private final static String IFACE_NAME="$Iface";
    private final static String PROCESS_NAME="$Processor";

    private List<Object> targets;
    private Map<String, TProcessor> processors;
    private TMultiplexedProcessor multiplexedProcessor;
    public TMultiplexedProcessor getMultiplexedProcessor() {
        return multiplexedProcessor;
    }


    public void setMultiplexedProcessor(TMultiplexedProcessor multiplexedProcessor) {
        this.multiplexedProcessor = multiplexedProcessor;
    }


    public ThriftProcessorFactory() {
        super();
        // TODO Auto-generated constructor stub
    }


    public Map<String, TProcessor> getProcessors() {
        return processors;
    }


    public void setProcessors(Map<String, TProcessor> processors) {
        this.processors = processors;
    }
    public List<Object> getTargets() {
        return targets;
    }

    public void setTargets(List<Object> targets) {
        this.targets = targets;
    }

    /**
     * 将实现类封装成TProcessor类的集合
     * 
     */
    public void convertTargetToTProcessor(){
        if (targets.isEmpty()) {
            return ;
        }
        processors = new HashMap<String, TProcessor>();
        try {
            for (Object target : targets ) {
                Class iface= target.getClass().getInterfaces()[0];
                String ifaceName =iface.getName();
                String serviceName = ifaceName.substring(0, ifaceName.lastIndexOf(IFACE_NAME));

                Class processorClazz = Class.forName(serviceName.concat(PROCESS_NAME));
                Object processorObj = processorClazz.getConstructor(iface).newInstance(iface.cast(target));
                if (processorObj instanceof TProcessor) {
                    TProcessor processor = (TProcessor) processorObj;
                    processors.put(serviceName, processor);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        initMultiplexedProcessor();
    }

    /**
     * 初始化多服务调用过程 TMultiplexedProcessor 
     * 并且注册服务
     */
    private void initMultiplexedProcessor(){
        if (processors.isEmpty()) {
            return ;
        }
        multiplexedProcessor = new TMultiplexedProcessor();
        Set<String> serviceNames = processors.keySet();
        for (String serviceName : serviceNames) {
            if (!processors.containsKey(serviceName)) {
                continue;
            }
            multiplexedProcessor.registerProcessor(serviceName, processors.get(serviceName));
        }
    }
}
AppThriftServer.java 重点,服务启动

public class AppThriftServer  implements ApplicationContextAware {

    /**线程池**/
    private static ExecutorService executorService;
    // ApplicationContextAware 可以调用spring 生命周期获取上下文
    private static ApplicationContext context;

    public AppThriftServer() {
        super();
        // TODO Auto-generated constructor stub
        executorService = Executors.newSingleThreadExecutor();
    }


    public void initThriftServer(){
        executorService.execute(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                System.out.println(" ThriftServer start ing ....");
                TNonblockingServerSocket transport = null;
                try { 
                    //非阻塞 ServerSocket
                    transport =new TNonblockingServerSocket(new InetSocketAddress(29999));  
                    // 获取 TProcessor 
                    ThriftProcessorFactory thriftProcessorFactory=context.getBean(ThriftProcessorFactory.class);
                    TProcessor processor =thriftProcessorFactory.getProcessor();
                    //  nio selectorThreads处理io,  workerThreads 处理服务调用过程
                    TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport);
                    args.processor(processor);
                    // customer也要TFramedTransport对应 否则报错
                    args.transportFactory(new TFramedTransport.Factory());  
                    //二进制协议  
                    args.protocolFactory(new TBinaryProtocol.Factory());  
                    TThreadedSelectorServer server =new TThreadedSelectorServer(args);
                    server.serve();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (transport != null) {
                        transport.close();
                    }

                }





            }
        });

    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // TODO Auto-generated method stub
        context=applicationContext;
    }

}

接着启动项目,服务端便开始监听;

接着是 customer 客户端

调用主体

public class App 
{

    private static ApplicationContext applicationContext;

    public static void main( String[] args )
    {
        System.out.println( "Hello World!" );
        init();
        final TestService testService=applicationContext.getBean(TestService.class);

        // 回调方法
        ITestListener testListener = new ITestListener() {
            public void doTest() {
                testService.doThriftInfoTest();
            }
        };

        //测试方法
        doThriftTest(1,testListener);
    }


    //并发线程,用来测试并发量
    public static void doThriftTest (int threadCounts,final ITestListener testListener){
        final Object waitObj = new Object();

        for (int i=0 ;i< threadCounts ;i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    // TODO Auto-generated method stub
                    synchronized (waitObj) {
                        try {
                            waitObj.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }


                    if (testListener != null) {
                        testListener.doTest();
                    }



                }
            }).start();
        }


        try {
            Thread.currentThread().sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        synchronized (waitObj) {
            waitObj.notifyAll();
        }

    }


    //测试方法回调
    public interface ITestListener {
        public void doTest();
    }

    public static void init(){

        applicationContext = new FileSystemXmlApplicationContext("/resources/applicationContext.xml"); 

    }
}
现在是直接在spring配置文件中配置thrift service的bean,调用代码跟一般的bean一模一样

TestService.java 代码:




    public void doThriftTest(){

        //运用动态代理 使thrift 接口透明化调用
        // 与普通的spring bean 一样调用
        IThriftTestService.Iface client = BeanHelper.getContext().getBean(IThriftTestService.Iface.class);
        Map<String, String> map =new HashMap<String, String>();
        map.put("name", "庄杰森");
        map.put("IThriftTestService", "client");
        map.put("content", "thrift 的 rpc 调用");
        String name = "zhuangjiesen ...IThriftTestService doing...";
        try {
            client.showThriftResult(name, true, map);
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }



    public void doThriftInfoTest(){

        //运用动态代理 使thrift 接口透明化调用
        // 与普通的spring bean 一样调用
        IThriftInfoTestService.Iface client = BeanHelper.getContext().getBean(IThriftInfoTestService.Iface.class);
        Map<String, String> map =new HashMap<String, String>();
        map.put("name", "庄杰森");
        map.put("IThriftInfoTestService", "client");
        map.put("content", "thrift 的 rpc 调用");
        String name = "zhuangjiesen ...IThriftInfoTestService doing...";
        try {
            client.showInfoData(name, true, map);
        } catch (TException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }


thrift中 spring 的配置文件 :spring-thrift.xml

代码:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:p="http://www.springframework.org/schema/p"
    xmlns:mvc="http://www.springframework.org/schema/mvc" 
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd    
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd    
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd  
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd            http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd"
>

    <!-- thrift 连接池 -->
    <bean id="thriftConnectionPool" class="com.java.core.rpc.thrift.supports.ThriftConnectionPool">
        <property name="host" value="127.0.0.1"/>
        <property name="port" value="29999"/>
        <property name="maxConnections" value="20"/>
        <property name="minConnections" value="5"/>

    </bean>


    <!-- 服务管理类 -->
    <bean id="thriftServiceManager" class="com.java.core.rpc.thrift.supports.ThriftServiceManager" >
        <property name="thriftConnectionPool" ref="thriftConnectionPool"></property>


    </bean>

    <!-- 通过实现 FactoryBean ,在进行动态代理,实现对服务的配置-->
    <bean id="thriftTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
        <property name="thriftServiceManager" ref="thriftServiceManager" ></property>
        <property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftTestService.Iface" ></property>

    </bean>


    <bean id="thriftInfoTestService" class="com.java.core.rpc.thrift.supports.ThriftSpringFactoryBean">
        <property name="thriftServiceManager" ref="thriftServiceManager" ></property>
        <property name="serviceIfaceClass" value="com.java.core.rpc.thrift.service.IThriftInfoTestService.Iface" ></property>

    </bean>

</beans>


ThriftSpringFactoryBean.java 可以查找关于spring容器的 FactoryBean 接口用法 用法对照 org.springframework.aop.framework.ProxyFactoryBean

代码


public class ThriftSpringFactoryBean<T> implements FactoryBean<T> {

    private ThriftServiceManager thriftServiceManager;

    private Class serviceIfaceClass;

    public T getObject() throws Exception {
        return (T) thriftServiceManager.getThriftClient(serviceIfaceClass);
    }

    public Class<T> getObjectType() {
        return serviceIfaceClass;
    }

    public boolean isSingleton() {
        return true;
    }


    public ThriftServiceManager getThriftServiceManager() {
        return thriftServiceManager;
    }

    public void setThriftServiceManager(ThriftServiceManager thriftServiceManager) {
        this.thriftServiceManager = thriftServiceManager;
    }

    public Class getServiceIfaceClass() {
        return serviceIfaceClass;
    }

    public void setServiceIfaceClass(Class serviceIfaceClass) {
        this.serviceIfaceClass = serviceIfaceClass;
    }
}

ThriftServiceManager 服务管理类

代码:


public class ThriftServiceManager {

    private ThriftConnectionPool thriftConnectionPool;


//    private final ConcurrentHashMap<String,Object> thriftClientCache = new ConcurrentHashMap();


    public <T> T getThriftClient(Class<T> serviceIfaceClass){
        if (!serviceIfaceClass.isInterface()) {
            throw new RuntimeException("类型错误");
        }
        T client =null;
        ThriftServiceProxyInvocation proxyInvocation =new ThriftServiceProxyInvocation();
        // 代理接口类
        proxyInvocation.setIfaceClazz(serviceIfaceClass);
        // 设置连接池
        proxyInvocation.setThriftConnectionPool(thriftConnectionPool);
        // 获取远程服务代理类
        client = (T) Proxy.newProxyInstance(serviceIfaceClass.getClassLoader(), new Class[]{ serviceIfaceClass }, proxyInvocation);
        return client;
    }


    public ThriftConnectionPool getThriftConnectionPool() {
        return thriftConnectionPool;
    }

    public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
        this.thriftConnectionPool = thriftConnectionPool;
    }
}


ThriftServiceProxyInvocation.java 动态代理类,获取连接并调用远程方法

代码:


public class ThriftServiceProxyInvocation implements InvocationHandler {

    /*thrift 服务类的iface 类*/
    private Class ifaceClazz;
    /* thrift 连接池*/
    private ThriftConnectionPool thriftConnectionPool;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        // TODO Auto-generated method stub
        System.out.println(" ThriftServiceProxyInvocation  invoke doing before ....");
        if (ifaceClazz == null) {
            return null;
        }
        Object result = null;
        try {
            String serviceIfaceClassName = ifaceClazz.getName();
            String serviceClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,"");
            String serviceClientClassName = serviceIfaceClassName.replace(ThriftConstant.IFACE_NAME,ThriftConstant.CLIENT_NAME);
            Class clientClazz = Class.forName(serviceClientClassName);
            // 连接池中选择 protocol
            TProtocol protocol = thriftConnectionPool.getProtocol(serviceClassName);
            Object clientInstance= clientClazz.getConstructor(TProtocol.class).newInstance(protocol);
            result=method.invoke(clientInstance, args);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            // 回收 protocol
            thriftConnectionPool.recycleProtocol();
        }

        System.out.println(" ThriftServiceProxyInvocation  invoke doing after ....");

        return result;
    }



    public Class getIfaceClazz() {
        return ifaceClazz;
    }


    public void setIfaceClazz(Class ifaceClazz) {
        this.ifaceClazz = ifaceClazz;
    }


    public ThriftConnectionPool getThriftConnectionPool() {
        return thriftConnectionPool;
    }

    public void setThriftConnectionPool(ThriftConnectionPool thriftConnectionPool) {
        this.thriftConnectionPool = thriftConnectionPool;
    }

}

接下来就是重点 thrift 连接池 ThriftConnectionPool.java

代码:

重点方法:getProtocolInternal() 获取连接

public class ThriftConnectionPool implements InitializingBean {


    private String host;
    private int port;
    private int minConnections = 5;
    private int maxConnections = 10;
    private int connectionsCount = 0;

    private int waitQueueSeconds = 10 ;
    private int recycleSeconds=10;

    // TProtocol 连接
    private LinkedBlockingQueue<TProtocol> blockingQueue;
    private LinkedBlockingQueue<Thread> waitingThreadBlockingQueue ;

    private ThreadLocal<TProtocol> protocolLocal = new ThreadLocal<TProtocol>();




    //回收线程
    private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);;

    public ThriftConnectionPool() {

    }

    //初始化连接池
    public synchronized void initThriftConnectionPool(){
        blockingQueue = new LinkedBlockingQueue<TProtocol>();
        //初始化线程排队队列
        waitingThreadBlockingQueue = new LinkedBlockingQueue<Thread>();
        for (int i = 0 ; i < minConnections ; i++) {
            blockingQueue.add(createNewProtocol());
        }

        //回收线程
        scheduledExecutorService.schedule(new Runnable() {
            public void run() {


                reducePool();


            }
        },recycleSeconds, TimeUnit.SECONDS);

    }

    //创建协议
    public synchronized TProtocol createNewProtocol(){
        TProtocol protocol = null;
        if (connectionsCount < maxConnections) {
            try {
                TSocket socket = new TSocket(host,port);
                TFramedTransport framedTransport = new TFramedTransport(socket);
                TBinaryProtocol binaryProtocol = new TBinaryProtocol(framedTransport);
                binaryProtocol.getTransport().open();
                protocol = binaryProtocol;
                connectionsCount ++ ;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {

            }
        }
        return protocol;
    }

    //从连接池中获取Protocol
    public TProtocol getProtocolInternal(){
        protocolLocal.remove();
        TProtocol protocol = null;
        protocol = blockingQueue.poll();
        if (protocol == null) {
            protocol = createNewProtocol();
            if (protocol == null) {
                waitingThreadBlockingQueue.add(Thread.currentThread());
                while ( waitingThreadBlockingQueue.peek().getId() != Thread.currentThread().getId() ) {
                    //当前线程排在最前面
                    if (waitingThreadBlockingQueue.peek().getState() == Thread.State.TERMINATED) {
                        //队列最前的线程已经死了
                        waitingThreadBlockingQueue.poll();
                    }
                }
                try {
                    protocol = blockingQueue.poll(waitQueueSeconds,TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    waitingThreadBlockingQueue.poll();
                }

            }
//            waitingThreadBlockingQueue

        } else if (protocol != null && (!protocol.getTransport().isOpen())) {
            //取到 protocol 但是已经关闭 重新创建
            protocol = createNewProtocol();
        }
        protocolLocal.set(protocol);
        return protocol;
    }


    public TProtocol getProtocol(String serviceName){
        TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(getProtocolInternal(),serviceName);
        return multiplexedProtocol;
    }

    public void recycleProtocol(){
        TProtocol protocol = protocolLocal.get();
        if (protocol != null) {
            if (protocol.getTransport() != null && (!protocol.getTransport().isOpen())) {
                protocol = createNewProtocol();
            }
            blockingQueue.add(protocol);
        }
        protocolLocal.remove();
    }


    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public int getMinConnections() {
        return minConnections;
    }

    public void setMinConnections(int minConnections) {
        this.minConnections = minConnections;
    }

    public int getMaxConnections() {
        return maxConnections;
    }

    public void setMaxConnections(int maxConnections) {
        this.maxConnections = maxConnections;
    }


    public int getWaitQueueSeconds() {
        return waitQueueSeconds;
    }

    public void setWaitQueueSeconds(int waitQueueSeconds) {
        this.waitQueueSeconds = waitQueueSeconds;
    }

    public void afterPropertiesSet() throws Exception {
        initThriftConnectionPool();
    }


    @Override
    protected void finalize() throws Throwable {
        destroy();
        super.finalize();
    }

    //关闭连接
    public synchronized void destroy(){
        try {
            TProtocol protocol = blockingQueue.take();
            while (protocol != null) {
                if (protocol.getTransport().isOpen()) {
                    protocol.getTransport().close();
                }
                protocol = blockingQueue.take();
                connectionsCount --;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }



    private synchronized void reducePool(){
        if (connectionsCount > minConnections) {
            int connectionsCountTemp = connectionsCount;
            TProtocol protocol = null;
            for (int i = connectionsCountTemp ; i > minConnections ; i--) {
                try {
                    protocol = blockingQueue.poll(waitQueueSeconds,TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                }
                //关闭连接
                if (protocol != null && protocol.getTransport() != null && (protocol.getTransport().isOpen())){
                    protocol.getTransport().close();
                }

            }

        }

    }


}