OkHttp 源码剖析系列(一)——请求的发起及拦截器机制概述

1,602 阅读15分钟

你好,我是 N0tExpectErr0r,一名热爱技术的 Android 开发

我的个人博客:blog.N0tExpectErr0r.cn

OkHttp 源码剖析系列文章目录:

OkHttp 源码剖析系列(一)——请求的发起及拦截器机制概述

OkHttp 源码剖析系列(二)——拦截器整体流程分析

OkHttp 源码剖析系列(三)——缓存机制

OkHttp 源码剖析系列(四)——连接建立概述

OkHttp 源码剖析系列(五)——代理路由选择

OkHttp 源码剖析系列(六)——连接复用机制及连接的建立

OkHttp 源码剖析系列(七)——请求的发起及响应的读取

OkHttp 是一个从入门 Android 时就接触的网络请求库了,想想现在也陪伴它快两年了,却没有系统性地对它进行过一次系统性的源码解析。因此准备开设这样一个系列,对 OkHttp 的源码进行解析。

此篇源码解析基于 OkHttp 3.14

OkHttpClient

我们都知道,使用 OkHttp 我们首先需要创建并获得一个 OkHttpClientOkHttpClient 是 OkHttp 中十分重要的一个类,下面是官方在 Java Doc 中对它的介绍:

Factory for {@linkplain Call calls}, which can be used to send HTTP requests and read their responses.

OkHttpClients should be shared OkHttp performs best when you create a single {@code OkHttpClient} instance and reuse it for all of your HTTP calls. This is because each client holds its own connection pool and thread pools. Reusing connections and threads reduces latency and saves memory. Conversely, creating a client for each request wastes resources on idle pools.

根据官方对其的介绍可以看出,它是一个 Call 的工厂类,可以用它来生产 Call,从而通过 Call 来发起 HTTP Request 获取 Response。

同时,官方推荐的使用方式是使用一个全局的 OkHttpClient 在多个类之间共享。因为每个 Client 都会有一个自己的连接池和线程池,复用 Client 可以减少资源的浪费。

它的构建采用了 Builder 模式,提供了许多可供我们配置的参数:

public static final class Builder {
    Dispatcher dispatcher;
    @Nullable
    Proxy proxy;
    List<Protocol> protocols;
    List<ConnectionSpec> connectionSpecs;
    final List<Interceptor> interceptors = new ArrayList<>();
    final List<Interceptor> networkInterceptors = new ArrayList<>();
    EventListener.Factory eventListenerFactory;
    ProxySelector proxySelector;
    CookieJar cookieJar;
    @Nullable
    Cache cache;
    @Nullable
    InternalCache internalCache;
    SocketFactory socketFactory;
    @Nullable
    SSLSocketFactory sslSocketFactory;
    @Nullable
    CertificateChainCleaner certificateChainCleaner;
    HostnameVerifier hostnameVerifier;
    CertificatePinner certificatePinner;
    Authenticator proxyAuthenticator;
    Authenticator authenticator;
    ConnectionPool connectionPool;
    Dns dns;
    boolean followSslRedirects;
    boolean followRedirects;
    boolean retryOnConnectionFailure;
    int callTimeout;
    int connectTimeout;
    int readTimeout;
    int writeTimeout;
    int pingInterval;
    // ...
}

可以看到,它的可配置的参数还是非常多的。

构建了 OkHttpClient 之后,我们可以通过 OkHttpClient.newCall 方法根据我们传入的 Request 创建对应的 Call

/**
 * Prepares the {@code request} to be executed at some point in the future.
 */
@Override
public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

Request

Request 所对应的就是我们 HTTP 请求中的 Request,可以对它的 url、method、header 等在 Builder 中进行配置。

Request 的构建同样采用了 Builder 模式进行构建:

public static class Builder {
    @Nullable
    HttpUrl url;
    String method;
    Headers.Builder headers;
    @Nullable
    RequestBody body;
    // ...
}

构建完 Request 后,就可以调用 OkHttpClient.newCall 方法创建对应 Call

Call

构建

我们知道,newCall 方法中调用了 RealCall.newRealCall(this, request, false /* for web socket */);,其中第三个参数代表是否使用 web socket。

我们看看 RealCall.newRealCall 方法:

static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    // Safely publish the Call instance to the EventListener.
    RealCall call = new RealCall(client, originalRequest, forWebSocket);
    call.transmitter = new Transmitter(client, call);
    return call;
}

这里根据我们传入的参数构建了一个 RealCall 对象,并根据 client 构建了其 transmitter

RealCall 的构造函数中主要是一些赋值:

private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    this.client = client;
    this.originalRequest = originalRequest;
    this.forWebSocket = forWebSocket;
}

Transmitter 中也是一些赋值操作:

public Transmitter(OkHttpClient client, Call call) {
    this.client = client;
    this.connectionPool = Internal.instance.realConnectionPool(client.connectionPool());
    this.call = call;
    this.eventListener = client.eventListenerFactory().create(call);
    this.timeout.timeout(client.callTimeoutMillis(), MILLISECONDS);
} 

其中首先调用了 Internal.instance.realConnectionPool 方法,通过 client.connectionPool 获取到了 RealConnectionPool 对象,之后调用了 client.eventListenerFactory().create(call) 方法构创建了其 eventListener

请求的发起

OkHttp 的执行有两种方式,enqueueexecute,它们分别代表了异步请求与同步请求:

  • enqueue:代表了异步请求,不会阻塞调用线程。需要我们传入一个 Callback,当请求成功时,会回调其 onResponse 方法,请求失败时则会回调其 onFailure 方法。

  • execute:代表了同步请求,会阻塞调用线程,请求结束后直接返回请求结果。

让我们分别对其进行分析:

异步请求

我们先分析一下 enqueue 方法:

@Override
public void enqueue(Callback responseCallback) {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    // 通知eventListener
    transmitter.callStart();
    // 构建AsyncCall并分派任务
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

它首先调用了 transmitter.callStart,最后调用到了之前构造的 eventListenercallStart 方法

之后它调用了 client.dispatcher().enqueue 方法,构建了一个 AsyncCall 对象后交给了 client.dispatcher 进行任务的分派。

executeOn

AsyncCall 类对外暴露了 executeOn 方法,Dispatcher 可以通过调用该方法并传入 ExecutorService 使其在该线程池所提供的线程中发起 HTTP 请求,获取 Response 并回调 Callback 的对应方法从而实现任务的调度。

void executeOn(ExecutorService executorService) {
    assert (!Thread.holdsLock(client.dispatcher()));
    boolean success = false;
    try {
    	// 在对应的ExecutorService中执行该AsyncCall
        executorService.execute(this);
        success = true;
    } catch (RejectedExecutionException e) {
   		// 出现问题,调用Callback对应方法
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
    } finally {
    	// 不论是否成功,通知Dispatcher请求完成
        if (!success) {
            client.dispatcher().finished(this); // This call is no longer running!
        }
    }
}

可以看出,AsyncCall 是一个 Runnable,我们看看它实现的 execute 方法:

@Override
protected void execute() {
    boolean signalledCallback = false;
    // 开始Timeout计时
    transmitter.timeoutEnter();
    try {
    	// 获取Response
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        // 请求成功,通知Callback
        responseCallback.onResponse(RealCall.this, response);
    } catch (IOException e) {
        if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
        	// 请求失败,通知Callback
            responseCallback.onFailure(RealCall.this, e);
        }
    } finally {
		// 不论是否成功,通知Dispatcher请求完成
        client.dispatcher().finished(this);
    }
}

这里首先调用了 transmitter.timeoutEnter() 方法开始了 Timeout 的计时。

之后若请求成功,则会通过 getResponseWithInterceptorChain 方法获取了 Response,之后调用 Callback.onResponse 方法通知请求成功。

若请求失败,会调用 Callback.onFailure 方法通知请求失败。

看来网络请求的核心实现在 getResponseWithInterceptorChain 方法中实现,而 OkHttp 的超时机制与 transmitter.timeoutEnter 有关,我们暂时先不关注这些细节。

异步线程池

让我们来看看 OkHttp 对异步请求采用了怎样的线程池。调用者在 AsyncCall.executeOn 方法中传入了 Dispatcher.executorService 方法的返回值,我们来到此方法:

public synchronized ExecutorService executorService() {
    if (executorService == null) {
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
                new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
}

我们知道,这里的线程池是可以通过创建 Dispatcher 时指定的,若不指定,则这里会创建一个如上代码中的线程池,我们来分析一下它的几个参数。

  • 核心线程数 corePoolSize:保持在线程池中的线程数,即使空闲后也不会保留。由于为 0,因此任何线程空闲时都不会被保留。
  • 最大线程数 maximumPoolSize:线程池最大支持创建的线程数,这里指定了 Integer.MAX_VALUE
  • 线程存活时间 keepAliveTime:线程空闲后所能存活的时间,若超过该时间就会被回收。这里指定的时间为 60 个时间单位(60s),也就是说线程在空闲超过 60s 后就会被回收。
  • 时间单位 unit:签名的线程存活时间的单位,这里为 TimeUnit.SECONDS,也就是说秒
  • 线程等待队列 workQueue:线程的等待队列,里面的元素会按序排队,依次执行,这里和指定的是 SynchronousQueue
  • 线程工厂 threadFactory:线程的创建工厂这里传入的是 Util.threadFactory 方法创建的线程工厂。

对于上面的几个参数,我们有几个细节需要考虑一下:

为什么要采用 SynchronousQueue

首先我们先需要了解一下什么是 SynchronousQueue,它虽然是一个队列,但它内部不存在任何的容器,它采用了一种经典的生产者-消费者模型,它有多个生产者和消费者,当一个生产线程进行生产操作(put)时,若没有消费者线程进行消费(take),那么该线程会阻塞,直到有消费者进行消费。也就是说,它仅仅实现了一个传递的操作,这种传递功能由于没有了中间的放入容器,再从容器中取出的过程,因此是一种快速传递元素的方式,这对于我们网络请求这种高频请求来说,是十分合适的。关于 SynchronousQueue 可以看这篇文章: java并发之SynchronousQueue实现原理

为什么线程池采用这种线程数量不设上限,每个线程空闲时只存活很短时间的策略

实际上在 OkHttp 的设计中,将线程的个数的维护工作不再交给线程池,而是由 Dispatcher 进行实现,通过外部所设置的 maxRequestsmaxRequestsPerHost 来调整等待队列及执行队列,从而实现对线程最大数量的控制。具体 Dispatcher 的实现在本文后面会讲到。

同步请求

execute

我们接着看到 execute 方法,看看同步请求的执行:

@Override
public Response execute() throws IOException {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    transmitter.timeoutEnter();
    transmitter.callStart();
    try {
        // 通知Dispatcher
        client.dispatcher().executed(this);
        // 获取Response
        return getResponseWithInterceptorChain();
    } finally {
    	// 不论是否成功,通知Dispatcher请求完成
        client.dispatcher().finished(this);
    }
}

它首先调用了 Dispatcher.executed 方法,通知 Dispatcher 该 Call 被执行,之后调用到了 getResponseWithInterceptorChain 方法获取 Response,不论是否成功都会调用 Dispatcher.finished 通知 Dispatcher 该 Call 执行完成。

Dispatcher 任务调度

enqueue

我们看看 Dispatcher 是如何调度异步请求的,来到 Dispatcher.enqueue 方法:

void enqueue(AsyncCall call) {
    synchronized (this) {
    	// 加入等待队列
        readyAsyncCalls.add(call);
        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.get().forWebSocket) {
            // 寻找同一个host的Call
            AsyncCall existingCall = findExistingCallWithHost(call.host());
            // 复用Call的callsPerHost
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
        }
    }
    // 尝试执行等待队列中的任务
    promoteAndExecute();
}

这里先将其加入了 readAsyncCalls 这一等待队列中。

之后调用了 findExistingCallWithHost 方法尝试寻找 host 相同的 Call,它会遍历 readyAsyncCallsrunningAsyncCalls 两个队列寻找 host 相同的 Call。

若找到了对应的 Call,则会调用 call.reuseCallsPerHostFrom 来复用这个 Call 的 callsPerHost,从而便于统计一个 host 对应的 Call 的个数,它是一个 AtomicInteger

最后会调用 promoteAndExecute 方法,这个方法会尝试将等待队列中的任务执行。

executed

我们继续看看 Dispatcher 是如何调度同步请求的,来到 Dispatcher.executed 方法:

synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
}

这里很简单,直接将该同步任务加入了执行队列 runningSyncCalls 中。

promoteAndExecute

我们看到 promoteAndExecute 方法:

private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));
    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
            AsyncCall asyncCall = i.next();
            if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
            if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
            i.remove();
            // 增加host对应执行中call的数量
            asyncCall.callsPerHost().incrementAndGet();
            executableCalls.add(asyncCall);
            runningAsyncCalls.add(asyncCall);
        }
        isRunning = runningCallsCount() > 0;
    }
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
        AsyncCall asyncCall = executableCalls.get(i);
        asyncCall.executeOn(executorService());
    }
    return isRunning;
}

在这个方法中遍历了 readyAsyncCalls 队列,不断地寻找能够执行的 AsynCall,若找到则会在最后统一调用 AsyncCall.executeOn 方法在自己的 executorService 线程池中执行该 Call。其中,执行中的任务不能超过 maxRequests

finished

我们从前面 AsyncCall 的实现可以看出,每次请求完成后,不论成功失败,都会调用到 finished 方法通知 Dispatcher 请求结束:

void finished(AsyncCall call) {
	// 减少host对应执行中call的数量
    call.callsPerHost().decrementAndGet();
    finished(runningAsyncCalls, call);
}

它调用到了 finished 的另一个重载:

private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
        if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
        idleCallback = this.idleCallback;
    }
     
    // 尝试执行等待队列中的任务
	boolean isRunning = promoteAndExecute();
    if (!isRunning && idleCallback != null) {
        idleCallback.run();
    }
}

可以看到,这里再次调用了 promoteAndExecute 方法尝试执行等待队列中的任务,若当前等待队列中没有需要执行的任务,说明目前还比较空闲,没有到达设定的 maxRequests 。此时会调用 idleCallback.run 执行一些空闲 Callback

(这种设计有点类似 HandlerIdleHandler 机制,充分利用了一些空闲资源,值得我们学习)。

小结

可以看出,OkHttp 的任务的调度器的设计将请求分别分至了两个队列中,分别是等待队列及执行队列。

每次加入新的异步请求时,都会先将其加入等待队列,之后遍历等待队列尝试执行等待任务。

每次加入新的同步请求时,都会直接将其加入执行队列。

而每当一个请求完成时,都会通知到 Dispatcher,Dispatcher 会遍历准备队列尝试执行任务,若没有执行则说明等待队列是空的,则会调用 idleCallback.run 执行一些空闲时的任务,类似 Handler 的 IdleHandler 机制。

(在多线程下载器中的任务调度器就用到了这里的 Dispatcher 的设计)

响应的获取

从前面的同步和异步请求中都可以看出,响应的获取的核心实现是 RealCall.getResponseWithInterceptorChain 方法:

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    // 初始化拦截器列表
    List<Interceptor> interceptors = new ArrayList<>();
    // 用户自定义的 Interceptor
    interceptors.addAll(client.interceptors());
    interceptors.add(new RetryAndFollowUpInterceptor(client));
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
    	// 用户自定义的网络 Interceptor
        interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
            originalRequest, this, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    boolean calledNoMoreExchanges = false;
    try {
        Response response = chain.proceed(originalRequest);
        if (transmitter.isCanceled()) {
            closeQuietly(response);
            throw new IOException("Canceled");
        }
        return response;
    } catch (IOException e) {
        calledNoMoreExchanges = true;
        throw transmitter.noMoreExchanges(e);
    } finally {
        if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null);
        }
    }
}

这个方法非常重要,短短几行代码就实现了对请求的所有处理,它体现了 OkHttp 中一个很重要的核心设计——拦截器机制。

它首先在 interceptors 中加入了用户自定义的拦截器,之后又按顺序分别加入了各种系统内置的拦截器。

之后通过 RealInterceptorChain 的构造 函数构造了一个 Chain 对象,之后调用了其 proceed 方法,从而得到了该请求的 Response。

那么这个过程中究竟是如何获取到 Response 的呢?让我们先理解一下 OkHttp 的拦截器机制。

拦截器机制概述

OkHttp 的网络请求的过程就是依赖于各种拦截器(Interceptor)实现的,我们先看看 Interceptor 的定义:

/**
 * Observes, modifies, and potentially short-circuits requests going out and the corresponding
 * responses coming back in. Typically interceptors add, remove, or transform headers on the request
 * or response.
 */
public interface Interceptor {
    Response intercept(Chain chain) throws IOException;
    
    interface Chain {
        Request request();
        Response proceed(Request request) throws IOException;
        /**
         * Returns the connection the request will be executed on. This is only available in the chains
         * of network interceptors; for application interceptors this is always null.
         */
        @Nullable
        Connection connection();
        Call call();
        int connectTimeoutMillis();
        Chain withConnectTimeout(int timeout, TimeUnit unit);
        int readTimeoutMillis();
        Chain withReadTimeout(int timeout, TimeUnit unit);
        int writeTimeoutMillis();
        Chain withWriteTimeout(int timeout, TimeUnit unit);
    }
}

Interceptor 实际上是一个接口,里面只有一个方法 intercept 以及一个接口 Chain

Interceptor

其中,intercept 方法往往是如下的结构:

@Override 
public Response intercept(Chain chain) throws IOException {
	Request request = chain.request();
	// Request阶段,该拦截器在Request阶段负责做的事情

	// 调用RealInterceptorChain.proceed(),其实是在递归调用下一个拦截器的intercept()方法
	response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);

	// Response阶段,完成了该拦截器在Response阶段负责做的事情,然后返回到上一层的拦截器。
	return response;     
}

这里先调用了 chain.request 方法获取到了本次请求的 Request 对象,

之后调用了 chain.proceed 方法递归调用下一个拦截器的 interceptor 方法。

最后返回了 chain.proceed 方法所返回的 Response

上面简单的三行代码将整个 intercept 过程分为了两个阶段:

  • Request 阶段:执行一些该拦截器在 Request 阶段所负责的事情
  • Response 阶段:完成该拦截器在 Response 阶段所负责的事情

这其实是采用了一种递归的设计,类似我们计算机网络中的分层模型,将 OkHttp 的请求分为了几个阶段,分别代表了不同的拦截器,不同拦截器分别会在这个递归的过程中有两次对该请求的处理的可能,一次是在 Request 之前,一次是在 Response 之后,中间的过程中若出现了错误,则通过抛出异常来通知上层。

预置的 Interceptor 有如下几种:

  • RetryAndFollowUpInterceptor:负责实现重定向功能
  • BridgeInterceptor:将用户构造的请求转换为向服务器发送的请求,将服务器返回的响应转换为对用户友好的响应
  • CacheInterceptor:读取缓存、更新缓存
  • ConnectInterceptor:建立与服务器的连接
  • CallServerInterceptor:从服务器读取响应

可以看出,整个网络请求的过程由各个拦截器互相配合从而实现,通过这种拦截器的机制,可以很方便地调节网络请求的过程及先后顺序,同时也能够很方便地使用户对其进行扩展。

其中用户可以在两个时机插入 Interceptor:

  • 网络请求前后:通过 OkHttpClient.addInterceptor 方法添加
  • 读取响应前后:通过 OkHttpClient.addNetworkInterceptor 方法添加

其整体流程如图所示:

image-20190730145213711

RealInterceptorChain

我们再看看是如何通过 RealInterceptorChain 将整个拦截器的调用过程连接起来的,我们先看看其构造过程:

public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
                            @Nullable Exchange exchange, int index, Request request, Call call,
                            int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.transmitter = transmitter;
    this.exchange = exchange;
    this.index = index;
    this.request = request;
    this.call = call;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
}

这里只是一些赋值过程,我们接着看到 chain.proceed 方法,看看它是如何执行的:

public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
        throws IOException {
   	// ...
   	// 构建下一个Interceptor的Chain
    RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
    // 获取当前Interceptor并执行intercept方法
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    // ...
    return response;
}

这里省略了一些异常处理,可以看到它首先构造了下一个拦截器对应的 Chain,之后获取到了当前的拦截器并调用了其 intercept 方法获取其结果,在 intercept 方法的参数中传入的就是下一个拦截器对应的 Chain

通过这种递归的设计,从而实现了从上到下,再从下到上这样一个递与归的过程,从而十分漂亮地实现了 HTTP 请求的全过程。

这是一种类似责任链模式的实现,这样的实现在网络请求的过程中十分常见,也十分值得我们去学习。

小结

OkHttp 在读取响应的过程中采用了一种责任链模式,预置了多个负责不同功能的拦截器,将它们通过责任链连接在一起,采用了一种递归的方式进行调用,从而使得每一层在请求前和响应后都能对本次请求作出不同的处理,通过各个拦截器的协调合作,最终完成了整个网络请求的过程。

参考资料

OkHttp 3.x 源码解析之Interceptor 拦截器

okhttp之旅(二)--请求与响应流程

java并发之SynchronousQueue实现原理