阅读 417

多线程应用--Http请求阻塞回调处理

1.需求描述

1.1场景说明:

由于,微信端的业务需求量越来越大.将业务与微信第三方事件处理耦合在一起的单一项目的结构已经逐渐暴露出,承载能力不足的缺点.所以,需要将与微信的交互从业务逻辑中分离出,单独进行管理和处理.
这样做有以下几点好处:

  1. 可以达到业务解耦分离.
  2. 可以为业务系统微服务化做准备.
  3. 可以在解耦后针对性的对不同业务系统进行优化.
  4. 减少业务系统错误的影响面.

1.2技术难点说明:

微信中的通过http调用客户配置的回调地址的方式来进行事件通知. 事件通知分为两种类型:

  1. 发送http请求和数据以后,客户服务器默认回复success字符串.然后,业务系统业务处理完成后通过指定的http地址通知微信方
  2. 发送http请求和数据的同一个请求中需要客户服务器在response中返回业务处理的结果.

由于,我们已经将事件通知从主业务系统中抽离出.所以,微信事件管理系统会在收到微信事件通知以后,通过mq的方式进行发布事件.但是,事件通知的第二种类型需要在一个http请求中将业务处理数据带回微信方.此时,就需要微信事件管理系统阻塞微信方发送来的http请求,直到业务系统处理完业务数据并返回给系统.
这样,我们就需要有一个灵活,可靠的容器对被阻塞的微信方请求进行管理.

2.理论基础

2.1Future多线程模型:

2.1.1模型介绍:

future多线程模型,是一种比较常用的多线程阻塞回调的模式.具体逻辑结构如下图所示:

future时序图.png

具体处理逻辑是这样的.当一个请求发送到一个future模式的入口以后,此时这个线程是阻塞的.这时future的线程会进行后面的回调业务,或者是直接开始等待.直到,其他线程唤醒future线程或者future等待超时.此时,future线程唤醒,然后将具体的结果返回给调用线程.

2.2线程间通信:

2.2.1wait,notify,notifyAll

线程间通信有很多种方式,这次用到的是比较简单的wait notify notifyall组合.这3个方法是object类中的3个方法.他们控制的目标是对于这个实例的控制.所以,需要线程在获取到这个对象操作的monitor以后才能控制.一般使用的方法是通过synchronized关键字获取对象锁再调用这3个方法.如果,没有在同步代码块中执行,这时候java会报IllegalMonitorStateException异常.这样主要是为了控制当同一个对象实例被多个线程占用以后的操作问题.可以避免不同步的情况产生.

2.2.1.1wait

wait方法主要是用来将这个对象实例上的当前线程进行挂起.可以输入timeout时间,超过timeout时间以后线程会自动唤醒

2.2.1.2notify

notify方法用来唤醒在对应的对象实例上休眠的线程,但是需要注意的是,这个是非公平的.具体唤醒哪一个线程由jvm自行决定

2.2.1.3notifyall

notifyall方法顾名思义,是将在这个实例对象上所有挂起的线程唤醒.

3.实现思路:

  1. 容错能力:由于需要给多个业务服务提供消息分发,消息回复.需要有业务系统超时的处理能力.所以,提供的阻塞服务都会有timeout设定.
  2. 持续服务能力:我们需要提供持续稳定的服务.在项目中.对阻塞的请求会有一个溢出的管理.如果超出某个最大值,先入的请求就会被直接返回默认值.所以,在业务服务中需要自行处理幂等的问题,避免业务处理完成后,但是由于溢出导致业务处理失败.这样就会导致业务服务数据或者业务出现问题

4.具体实现:

ThreadHolder(消息载体):

import lombok.Data;

import java.util.concurrent.Callable;

/**
 * <p>
 * Description: com.javanewb.service
 * </p>
 * date:2017/10/31
 *
 * @author Dean.Hwang
 */
@Data
public abstract class ThreadHolder<T> implements Callable<T> {
    protected abstract T proData();//TODO 正常逻辑处理,以及默认数据返回

    private T defaultData;//返回的默认数据
    private Object needProData;//接受到需要处理的数据
    private Long createTime = System.currentTimeMillis();
    private Long maxWaitTime;
    private String mdc;
    private RequestHolder<T> holder;

    @Override
    public T call() throws Exception {
        waitThread();
        System.out.println("Thread mdc:" + mdc + "  notify");
        if (needProData == null) {
            holder.removeThread(mdc, false);
            return defaultData;
        }
        return proData();
    }

    public synchronized void waitThread() throws InterruptedException {
        this.wait(maxWaitTime);
    }

    public synchronized void notifyThread(Object needProData) {
        this.needProData = needProData;
        this.notify();
    }

    public synchronized void notifyDefault() {
        this.notify();
    }
}
复制代码

RequestHolder(请求管理容器):


import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>
 * Description: com.javanewb.entity
 * </p>
 * date:2017/10/26
 *
 * @author Dean.Hwang
 */
public class RequestHolder<T> {
    private Integer maxSize;
    private Long waitTime;

    public RequestHolder(Integer maxSize, Long maxWait, ExecutorService executorService) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than max size num");
        }
        this.maxSize = maxSize;
        this.waitTime = maxWait;
        if (executorService != null) {
            this.executorService = executorService;
        } else {
            this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
        }
    }

    public RequestHolder(Integer maxSize, Long maxWait) {
        if (maxSize > 1000) {
            throw new BusinessException(1022, "Bigger than  max size num");
        }
        this.waitTime = maxWait;
        this.maxSize = maxSize;
        this.executorService = new ThreadPoolExecutor(Math.max(1, maxSize / 5), maxSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(maxSize));
    }

    private ExecutorService executorService;
    private final Map<String, ThreadHolder<T>> holderMap = new ConcurrentHashMap<>();
    private List<String> mdcOrderList = new CopyOnWriteArrayList<>();
    private AtomicBoolean isCleaning = new AtomicBoolean(false);

    public ThreadHolder<T> removeThread(String mdc, boolean needNotifyDefault) {
        mdcOrderList.remove(mdc);
        ThreadHolder<T> holder;
        synchronized (holderMap) {
            holder = holderMap.get(mdc);
            holderMap.remove(mdc);
        }
        if (holder != null && needNotifyDefault) {
            holder.notifyDefault();
        }
        return holder;
    }

    public void notifyThread(String mdc, Object data) {
        ThreadHolder<T> holder = removeThread(mdc, false);
        if (holder != null) {
            holder.notifyThread(data);
        }
    }


    public Future<T> getFuture(String mdcStr, Class<? extends ThreadHolder<T>> holder) {
        if (StringUtil.isEmpty(mdcStr) || holder == null) {
            throw new BusinessException(1020, "Mdc target missing!!!");
        }
        Future<T> future;
        try {
            ThreadHolder<T> thread = holder.newInstance();
            holderMap.put(mdcStr, thread);
            mdcOrderList.add(mdcStr);
            thread.setMaxWaitTime(waitTime);
            thread.setMdc(mdcStr);
            thread.setHolder(this);
            future = executorService.submit(thread);
            cleanThreadPool();
        } catch (InstantiationException | IllegalAccessException e) {
            holderMap.remove(mdcStr);
            mdcOrderList.remove(mdcStr);
            throw new BusinessException(1021, "Thread Holder initialized failed");
        }
        return future;
    }

    private void cleanThreadPool() {
        if (mdcOrderList.size() >= maxSize && isCleaning.compareAndSet(false, true)) {

            try {
                mdcOrderList.subList(0, mdcOrderList.size() - maxSize).forEach(//看测试效率,看是否用并行stream处理
                        mdc -> removeThread(mdc, true)
                );
            } finally {
                isCleaning.set(false);
            }
        }
    }
}
复制代码

TestController(测试入口):

import com.javanewb.entity.TestThreadHolder;
import com.javanewb.thread.tools.RequestHolder;
import com.keruyun.portal.common.filter.LoggerMDCFilter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * <p>
 * Description: com.javanewb.controller
 * </p>
 * <p>
 * Copyright: Copyright (c) 2015
 * </p>
 * <p>
 
 * </p>
 * date:2017/10/25
 *
 * @author Dean.Hwang
 */
@Api
@RestController
@Slf4j
public class TestController {
    private RequestHolder<String> holder = new RequestHolder<>(100, 500000L);
    private List<String> mdcList = new ArrayList<>();

    @ApiOperation(value = "请求同步测试", notes = "请求同步测试")
    @RequestMapping(value = "/async", method = RequestMethod.GET)
    public void async(HttpServletRequest request, HttpServletResponse response, String id) {
        Long startTime = System.currentTimeMillis();
        String mdc = MDC.get(LoggerMDCFilter.IDENTIFIER);
        mdcList.add(mdc);
        Future<String> future = holder.getFuture(id, TestThreadHolder.class);
        log.info(Thread.currentThread().getName());
        try {
            System.out.println(mdc + " Thread Wait");
            String result = future.get();
            response.getOutputStream().print(result);
            System.out.println(" time: " + (System.currentTimeMillis() - startTime));
        } catch (IOException | ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    @ApiOperation(value = "释放list第一个", notes = "请求同步测试")
    @RequestMapping(value = "/notify", method = RequestMethod.GET)
    public String notifyFirst() {
        String mdc = mdcList.get(0);
        mdcList.remove(0);
        holder.notifyThread(mdc, "");
        return mdc;
    }

    @ApiOperation(value = "释放list第一个", notes = "请求同步测试")
    @RequestMapping(value = "/notifyThis", method = RequestMethod.GET)
    public String notifyThis(String mdc) {
        int idx = 0;
        for (int i = 0; i < mdcList.size(); i++) {
            if (mdcList.get(i).equals(mdc)) {
                idx = i;
                break;
            }
        }
        mdcList.remove(idx);
        holder.notifyThread(mdc, "");
        return mdc;
    }
}
复制代码

5.后话:

本项目会放在github上,如果有兴趣,或者发现有bug需要处理的可以直接从博客联系我,也可以直接去github 地址:github.com/crowhyc/Thr… 邮箱:crowhyc@163.com

关注下面的标签,发现更多相似文章
评论