OkHttp3线程池相关之Dispatcher中的ExecutorService

2,314 阅读6分钟

先从Okhttp3的请求实现说起(包括同步请求和异步请求)

基本使用

同步请求:

private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
        .url(DESTINATION_ADDRESS)
        .build();
Response response = client.newCall(request).execute();

异步请求:

private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
        .url(DESTINATION_ADDRESS)
        .build();
client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
    }
});

小结:

  • 不管是同步请求还是异步请求,都需要初始化OkHttpClient以及创建一个Request,然后再调用OkHttpClinet的newCall方法,创建出一个RealCall对象
  • 对于同步请求,是调用RealCall的execute方法;而异步请求,则是调用RealCall的enqueue方法实现

简介

  • OkHttpCLient:在Okhttp库中,OkHttpClient处于一个中心者的地位,很多功能都需要通过它来转发或者实现的。在创建的时候,初始化了很多功能类,比如缓存,拦截器,网络连接池,分发器等类。 由于初始化比较复杂,OkHttpClient内部提供了Builder模式来创建,一般情况下,OkHttpClient是唯一的
  • Request:是用来构建一个请求对象的,符合Http请求的标准,包含了请求头,方法等等属性,较为复杂,因此同样提供Builder模式构建。
  • Response:是用来构建一个响应对象的,包含了响应头,响应码,数据等等属性,同样也提供Builder模式构建

RealCall

// 同步
client.newCall(request).execute()
// 异步
client.newCall(request).enqueue(Callback)

同步和异步请求,都是调用OkHttpClient的newCall方法创建一个RealCall对象,然后通过这个对象,执行请求的

@Override 
public Call newCall(Request request) {
    return new RealCall(this, request);
}

execute 同步请求

@Override 
public Response execute() throws IOException {
     // 方法检测,execute只能调用1次
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    try {
       // 将RealCall添加到Dispatcher中的同步请求队列中
      client.dispatcher().executed(this);
       // 获取响应的数据
       // Okhttp会对数据(请求流和响应流)进行拦截进行一些额外的处理,
       // 比如失败重连,添加请求头,没网络优先返回缓存数据等等。
       // 这一拦截过程,采用责任链模式来实现的,和Android中的事件传递机制差不多
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
       // 请求完成或者取消,都会将RealCall从同步请求队列中移除
      client.dispatcher().finished(this);
    }
}

enqueue-异步请求

@Override 
public void enqueue(Callback responseCallback) {
    // 和execute一样,enqueue也只能调用1次
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    // 执行请求
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

AsyncCall

AsyncCall–实际上是一个Runnable

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

NamedRunnable,继承Runnable的抽象类,根据传入的名称来修改线程名称,抽象出执行任务的execute方法

final class AsyncCall extends NamedRunnable {
    // 其他省略,execute方法式真正执行请求
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}

AsyncCall中的execute方法,是请求任务执行的方法,获取相应数据最终也是调用了getResponseWithInterceptorChain方法。

private Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!retryAndFollowUpInterceptor.isForWebSocket()) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(
        retryAndFollowUpInterceptor.isForWebSocket()));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

Dispatcher

原理:

  • 对于异步请求来说,Dispatcher类内部持有一个正在执行(运行中)异步任务的队列和一个等待执行(就绪)异步请求的队列,以及一个线程池
  • 这个线程池,没有常存的核心线程,最多线程数为Integer.MAX_VALUE,线程空闲时存活时间为60秒,而SynchronousQueue #2 是不保存任务的,所以只要把任务添加进去就会执行
  • OkHttp不是在线程池中维护线程的个数,线程是一直在Dispatcher中直接控制。线程池中的请求都是运行中的请求。这也就是说线程的重用不是线程池控制的,通过源码我们发现线程重用的地方是请求结束的地方finished(AsyncCall call) ,而真正的控制是通过promoteCalls方法, 根据maxRequests和maxRequestsPerHost来调整runningAsyncCalls和readyAsyncCalls,使运行中的异步请求不超过两种最大值,并且如果队列有空闲,将就绪状态的请求归类为运行中
public final class Dispatcher {
  //最大请求数
  private int maxRequests = 64;
  //相同host最大请求数
  private int maxRequestsPerHost = 5;
  //请求执行,懒加载
  private ExecutorService executorService;
  //就绪状态的异步请求队列
  private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
  //运行中的异步请求队列
  private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
  //进行中的同步请求,包括那些尚未完成被取消的请求
  private final Deque<Call> executedCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService getExecutorService() {
    if (executorService == null) {
       //corePoolSize 为 0表示,没有核心线程,所有执行请求的线程,使用完了如果过期了(keepAliveTime)就回收了,
      //maximumPoolSize 无限大的线程池空间
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

  public synchronized void setMaxRequests(int maxRequests) {
    if (maxRequests < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequests);
    }
    this.maxRequests = maxRequests;
    promoteCalls();
  }

  public synchronized int getMaxRequests() {
    return maxRequests;
  }

  public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
    if (maxRequestsPerHost < 1) {
      throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
    }
    this.maxRequestsPerHost = maxRequestsPerHost;
    promoteCalls();
  }

  public synchronized int getMaxRequestsPerHost() {
    return maxRequestsPerHost;
  }

  synchronized void enqueue(AsyncCall call) {
    if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningCalls.add(call);
      getExecutorService().execute(call);
    } else {
      readyCalls.add(call);
    }
  }

  //取消带有tag的所有请求
  public synchronized void cancel(Object tag) {
    for (AsyncCall call : readyCalls) {
      if (Util.equal(tag, call.tag())) {
        call.cancel();
      }
    }

    for (AsyncCall call : runningCalls) {
      if (Util.equal(tag, call.tag())) {
        call.get().canceled = true;
        HttpEngine engine = call.get().engine;
        if (engine != null) engine.cancel();
      }
    }

    for (Call call : executedCalls) {
      if (Util.equal(tag, call.tag())) {
        call.cancel();
      }
    }
  }

  //异步请求结束
  //当该异步请求结束的时候,会调用此方法,
  //用于将运行中的异步请求队列中的该请求移除并调整请求队列
  //此时就绪队列中的请求就可以进入运行中的队列
  synchronized void finished(AsyncCall call) {
    if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
    promoteCalls();
  }

  //根据maxRequests和maxRequestsPerHost来调整
  //runningAsyncCalls和readyAsyncCalls
  //使运行中的异步请求不超过两种最大值
  //并且如果队列有空闲,将就绪状态的请求归类为运行中。
  private void promoteCalls() {
    //运行中的异步请求队列的请求数大于最大请求数,那么就没必要去将就绪状态的请求移动到运行中。
    //其实就是说,如果有超过最大请求数的请求正在运行,是不需要将其移出队列的,继续运行完即可
    if (runningCalls.size() >= maxRequests) return;
    //如果就绪的队列为空,那就更没有必要移动了,因为都没有。
    if (readyCalls.isEmpty()) return; 
    //遍历就绪队列
    for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
      //取出一个请求
      AsyncCall call = i.next();
      //如果当前请求对应的host下,没有超过maxRequestsPerHost
      //那么将其从就绪队列中移除,并加入在运行队列。
      if (runningCallsForHost(call) < maxRequestsPerHost) { 
        //移除        
        i.remove();
        //加入运行队列
        runningCalls.add(call);
        //立即执行该请求
        getExecutorService().execute(call);
      }
        //如果运行队列已经到达了最大请求数上限
        //此时如果还有就绪中的请求,也不管了
      if (runningCalls.size() >= maxRequests) return; 
    }
  }

  //对比已有的运行中的请求和当前请求的host
  private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningCalls) {
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

  synchronized void executed(Call call) {
    executedCalls.add(call);
  }

  //同步请求结束
  //当该同步请求结束的时候,会调用此方法,
  //用于将运行中的同步请求队列中的该请求移除
  synchronized void finished(Call call) {
    if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
  }

  public synchronized int getRunningCallCount() {
    return runningCalls.size();
  }

  public synchronized int getQueuedCallCount() {
    return readyCalls.size();
  }
}