阅读 274

你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点)

Okhttp系列文章:

你想要的系列:网络请求框架OkHttp3全解系列 - (一)OkHttp的基本使用

你想要的系列:网络请求框架OkHttp3全解系列 - (二)OkHttp的工作流程分析

你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点)

你想要的系列:网络请求框架OkHttp3全解系列 - (四)拦截器详解2:连接、请求服务(重点)


在本系列的上一篇文章你想要的系列:网络请求框架OkHttp3全解系列 - (三)拦截器详解1:重试重定向、桥、缓存(重点)中,我们分析了OkHttp拦截器链中的前三个拦截器:RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor,它们在请求建立连接之前做了一些预处理。

如果请求经过这三个拦截器后,要继续往下传递,说明是需要进行网络请求的(缓存不能直接满足),也就是今天要分析的内容——剩下的两个拦截器:ConnectInterceptor、CallServerInterceptor,分别负责 连接建立请求服务读写

背景 - HTTP协议发展

在讲解拦截器之前,我们有必要了解http协议相关背景知识,因为okhttp的网络连接正是基于此实现的。HTTP协议经历了以下三个版本阶段。

HTTP1.0

HTTP1.0中,一次请求 会建立一个TCP连接,请求完成后主动断开连接。这种方法的好处是简单,各个请求互不干扰。 但每次请求都会经历 3次握手、2次或4次挥手的连接建立和断开过程——极大影响网络效率和系统开销。

http1.0

HTTP1.1

HTTP1.1中,解决了HTTP1.0中连接不能复用的问题,支持持久连接——使用keep-alive机制:一次HTTP请求结束后不会立即断开TCP连接,如果此时有新的HTTP请求,且其请求的Host同上次请求相同,那么会直接复用TCP连接。这样就减少了建立和关闭连接的消耗和延迟。keep-alive机制在HTTP1.1中是默认打开的——即在请求头添加:connection:keep-alive。(keep-alive不会永久保持连接,它有一个保持时间,可在不同的服务器软件(如Apache)中设定这个时间)

http1.1

HTTP2.0

HTTP1.1中,连接的复用是串行的:一个请求建立了TCP连接,请求完成后,下一个相同host的请求继续使用这个连接。 但客户端想 同时 发起多个并行请求,那么必须建立多个TCP连接。将会产生网络延迟、增大网路开销。

并且HTTP1.1不会压缩请求和响应报头,导致了不必要的网络流量;HTTP1.1不支持资源优先级导致底层TCP连接利用率低下。在HTTP2.0中,这些问题都会得到解决,HTTP2.0主要有以下特性

  • 新的二进制格式(Binary Format):http/1.x使用的是明文协议,其协议格式由三部分组成:request line,header,body,其协议解析是基于文本,但是这种方式存在天然缺陷,文本的表现形式有多样性,要做到健壮性考虑的场景必然很多,二进制则不同,只认0和1的组合;基于这种考虑,http/2.0的协议解析决定采用二进制格式,实现方便且健壮
  • 多路复用(MultiPlexing):即连接共享,使用streamId用来区分请求,一个request对应一个stream并分配一个id,这样一个TCP连接上可以有多个stream,每个stream的frame可以随机的混杂在一起,接收方可以根据stream id将frame再归属到各自不同的request里面
  • 优先级和依赖(Priority、Dependency):每个stream都可以设置优先级和依赖,优先级高的stream会被server优先处理和返回给客户端,stream还可以依赖其它的sub streams;优先级和依赖都是可以动态调整的,比如在APP上浏览商品列表,用户快速滑动到底部,但是前面的请求已经发出,如果不把后面的优先级设高,那么当前浏览的图片将会在最后展示出来,显然不利于用户体验
  • header压缩:http2.0使用encoder来减少需要传输的header大小,通讯双方各自cache一份header fields表,既避免了重复header的传输,又减小了需要传输的大小
  • 重置连接:很多APP里都有停止下载图片的需求,对于http1.x来说,是直接断开连接,导致下次再发请求必须重新建立连接;http2.0引入RST_STREAM类型的frame,可以在不断开连接的前提下取消某个request的stream

其中涉及了两个新的概念:

  • 数据流-stream:基于TCP连接之上的逻辑双向字节流,用于承载双向消息,对应一个请求及其响应。客户端每发起一个请求就建立一个数据流,后续该请求及其响应的所有数据都通过该数据流传输。每个数据流都有一个唯一的标识符和可选的优先级信息。
  • 帧-frame:HTTP/2的最小数据切片单位,承载着特定类型的数据,例如 HTTP 标头、消息负载,等等。 来自不同数据流的帧可以交错发送,然后再根据每个帧头的数据流标识符重新组装,从而在宏观上实现了多个请求或响应并行传输的效果。
    http2.0

这里的 多路复用机制 就实现了 在同一个TCP连接上 多个请求 并行执行。

无论是HTTP1.1的Keep-Alive机制还是HTTP2.0的多路复用机制,在实现上都需要引入连接池来维护网络连接。下面就开始分析 OkHttp中的连接池实现——连接拦截器ConnectInterceptor

ConnectInterceptor

连接拦截器ConnectInterceptor代码如下:

//打开到目标服务的连接、处理下一个拦截器
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }
}
复制代码

代码很少,主要是使用transmitter.newExchange获取Exchange实例,并作为参数调用拦截器链的proceed的方法。注意到前面分析过的拦截器调用的proceed方法是一个参数的,而这里是三个参数的。这是因为下一个拦截器(如果没有配置网络拦截器的话,就是CallServerInterceptor,也是最后一个)需要进行真正的网络IO操作,而 Exchange(意为交换)主要作用就是真正的IO操作:写入请求、读取响应(会在下一个拦截器做介绍)。

实际上获取Exchange实例的逻辑处理都封装在Transmitter中了。前面的文章提到过Transmitter,它是“发射器”,是把 请求 从应用端 发射到 网络层,它持有请求的 连接、请求、响应 和 流,一个请求对应一个Transmitter实例,一个数据流。下面就看下它的newExchange方法:

  Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    synchronized (connectionPool) {
      if (noMoreExchanges) {
        throw new IllegalStateException("released");
      }
      if (exchange != null) {
        throw new IllegalStateException("cannot make a new request because the previous response "
            + "is still open: please call response.close()");
      }
    }

    ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
    Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

    synchronized (connectionPool) {
      this.exchange = result;
      this.exchangeRequestDone = false;
      this.exchangeResponseDone = false;
      return result;
    }
  }
复制代码

若是第一次请求,前面两个if是没走进去的。接着看到使用exchangeFinder的find方法获取到了ExchangeCodec实例,然后作为参数构建了Exchange实例,并返回。嗯,看起来也很简单的样子。 注意到这个方法里涉及了连接池RealConnectionPool、交换寻找器ExchangeFinder、交换编解码ExchangeCodec、交换管理Exchange这几个类(翻译成这样尽力了😊,意会吧)。

  • RealConnectionPool,连接池,负责管理请求的连接,包括新建、复用、关闭,理解上类似线程池。
  • ExchangeCodec,接口类,负责真正的IO操作—写请求、读响应,实现类有Http1ExchangeCodec、Http2ExchangeCodec,分别对应HTTP1.1协议、HTTP2.0协议。
  • Exchange,管理IO操作,可以理解为 数据流,是ExchangeCodec的包装,增加了事件回调;一个请求对应一个Exchange实例。传给下个拦截器CallServerInterceptor使用。
  • ExchangeFinder,(从连接池中)寻找可用TCP连接,然后通过连接得到ExchangeCodec。

ExchangeFinder

ExchangeFinder的作用从名字就可以看出——Exchange寻找者,本质是为请求寻找一个TCP连接。如果已有可用连接就直接使用,没有则打开一个新的连接。 一个网络请求的执行,需要先有一个指向目标服务的TCP连接,然后再进行写请求、读响应的IO操作。ExchangeFinder是怎么寻找的呢?继续往下看~

我们先看exchangeFinder初始化的地方:

  public void prepareToConnect(Request request) {
    ...
    this.exchangeFinder = new ExchangeFinder(this, connectionPool, createAddress(request.url()),
        call, eventListener);
  }
复制代码

看到这里应该会想起上一篇文章中分析RetryAndFollowUpInterceptor时提到过,prepareToConnect这个方法作用是连接准备,就是创建了ExchangeFinder实例。主要到传入的参数有connectionPool、createAddress方法返回的Address、call、eventListener。connectionPool是连接池,稍后分析,先看下createAddress方法:

  private Address createAddress(HttpUrl url) {
    SSLSocketFactory sslSocketFactory = null;
    HostnameVerifier hostnameVerifier = null;
    CertificatePinner certificatePinner = null;
    if (url.isHttps()) {
      sslSocketFactory = client.sslSocketFactory();
      hostnameVerifier = client.hostnameVerifier();
      certificatePinner = client.certificatePinner();
    }

    return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
        sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
        client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
  }
复制代码

使用url和client配置 创建一个Address实例。Address意思是指向服务的连接的地址,可以理解为请求地址及其配置。Address有一个重要作用:相同Address的HTTP请求 共享 相同的连接。这可以作为前面提到的 HTTP1.1和HTTP2.0 复用连接 的请求的判断。

回头看exchangeFinder的find方法

  public ExchangeCodec find(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //找到一个健康的连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //利用连接实例化ExchangeCodec对象,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec
      return resultConnection.newCodec(client, chain);
    } catch (RouteException e) {
      trackFailure();
      throw e;
    } catch (IOException e) {
      trackFailure();
      throw new RouteException(e);
    }
  }
复制代码

主要就是通过findHealthyConnection方法获取连接RealConnection实例,然后用RealConnection的newCodec方法获取了ExchangeCodec实例,如果是HTTP/2返回Http2ExchangeCodec,否则返回Http1ExchangeCodec,然后返回。

findHealthyConnection方法名透露着 就是去寻找可用TCP连接的,而我们猜测这个方法内部肯定和连接池ConnectionPool有紧密的关系。接着跟进findHealthyConnection方法:

  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      //找连接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // 是新连接 且不是HTTP2.0 就不用体检
      synchronized (connectionPool) {
        if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
          return candidate;
        }
      }
      // 体检不健康,继续找
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      	//标记不可用
        candidate.noNewExchanges();
        continue;
      }

      return candidate;
    }
  }
复制代码

循环寻找连接:如果是不健康的连接,标记不可用(标记后会移除,后面讲连接池会讲到),然后继续找。健康是指连接可以承载新的数据流,socket是连接状态。我们跟进findConnection方法,看看到底是怎么 找连接 的:

  //为承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    RealConnection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      //请求已被取消(Call的cancel方法->transmitter的cancel方法),抛异常
      if (transmitter.isCanceled()) throw new IOException("Canceled");
      hasStreamFailure = false; 

      // 尝试使用 已给数据流分配的连接.(例如重定向请求时,可以复用上次请求的连接)
      releasedConnection = transmitter.connection;
      //有已分配的连接,但已经被限制承载新的数据流,就尝试释放掉(如果连接上已没有数据流),并返回待关闭的socket。
      toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
          ? transmitter.releaseConnectionNoEvents()
          : null;

      if (transmitter.connection != null) {
        // 不为空,说明上面没有释放掉,那么此连接可用
        result = transmitter.connection;
        releasedConnection = null;
      }

      if (result == null) {
        // 没有已分配的可用连接,就尝试从连接池获取。(连接池稍后详细讲解)
        if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        } else if (nextRouteToTry != null) {
          selectedRoute = nextRouteToTry;//有可尝试的路由
          nextRouteToTry = null;
        } else if (retryCurrentRoute()) {
          selectedRoute = transmitter.connection.route();
        }
      }
    }
    closeQuietly(toClose);//(如果有)关闭待关闭的socket

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);//(如果有)回调连接释放事件
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);//(如果有)回调(从连接池)获取连接事件
    }
    if (result != null) {
      // 如果有已分配可用连接 或 从连接池获取到连接,结束!  没有 就走下面的新建连接过程。
      return result;
    }

    // 如果需要路由信息,就获取。是阻塞操作
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    List<Route> routes = null;
    synchronized (connectionPool) {
      if (transmitter.isCanceled()) throw new IOException("Canceled");

      if (newRouteSelection) {
        //现在有了IP地址,再次尝试从连接池获取。可能会因为连接合并而匹配。(这里传入了routes,上面的传的null)
        routes = routeSelection.getAll();
        if (connectionPool.transmitterAcquirePooledConnection(
            address, transmitter, routes, false)) {
          foundPooledConnection = true;
          result = transmitter.connection;
        }
      }
	  //第二次连接池也没找到,就新建连接
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = new RealConnection(connectionPool, selectedRoute);
        connectingConnection = result;
      }
    }

    // 如果第二次从连接池的尝试成功了,结束,因为连接池中的连接是已经和服务器建立连接的
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 第二次没成功,就把新建的连接,进行TCP + TLS 握手,与服务端建立连接. 是阻塞操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    connectionPool.routeDatabase.connected(result.route());//从失败名单中移除

    Socket socket = null;
    synchronized (connectionPool) {
      connectingConnection = null;
      // 最后一次尝试从连接池获取,注意最后一个参数为true,即要求 多路复用(http2.0)
      //意思是,如果本次是http2.0,那么为了保证 多路复用性,(因为上面的握手操作不是线程安全)会再次确认连接池中此时是否已有同样连接
      if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
        // 如果获取到,就关闭我们创建里的连接,返回获取的连接
        result.noNewExchanges = true;
        socket = result.socket();
        result = transmitter.connection;

        // 那么这个刚刚连接成功的路由 就可以 用作下次 尝试的路由
        nextRouteToTry = selectedRoute;
      } else {
        //最后一次尝试也没有的话,就把刚刚新建的连接存入连接池
        connectionPool.put(result);
        transmitter.acquireConnectionNoEvents(result);//把连接赋给transmitter
      }
    }
    closeQuietly(socket);//如果刚刚建立的连接没用到,就关闭

    eventListener.connectionAcquired(call, result);
    return result;
  }
复制代码

代码看着很长,已经加了注释,方法目的就是 为 承载新的数据流 寻找 连接。寻找顺序是 已分配的连接、连接池、新建连接。梳理如下:

  1. 首先会尝试使用 已给数据流分配的连接。(已分配连接的情况例如重定向时的再次请求,说明上次已经有了连接)
  2. 若没有 已分配的可用连接,就尝试从连接池中 匹配获取。因为此时没有路由信息,所以匹配条件:address一致——host、port、代理等一致,且 匹配的连接可以接受新的数据流。
  3. 若从连接池没有获取到,则取下一个代理的路由信息(多个Route,即多个IP地址),再次尝试从连接池获取,此时可能因为连接合并而匹配到。
  4. 若第二次也没有获取到,就创建RealConnection实例,进行TCP + TLS 握手,与服务端建立连接。
  5. 此时为了确保Http2.0连接的多路复用性,会第三次从连接池匹配。因为新建立的连接的握手过程是非线程安全的,所以此时可能连接池新存入了相同的连接。
  6. 第三次若匹配到,就使用已有连接,释放刚刚新建的连接;若未匹配到,则把新连接存入连接池并返回。

流程图如下:

findConnection方法流程

看到这里,小盆友,你是否有很多问号?

  • 开始若有了已分配的连接,但已经被限制承载新的数据流,是如何释放的呢?
  • 代理路由信息是如何获取的呢?
  • 如何从连接池获取连接的?三次有什么不同?

没关系,慢慢来,我们先看第二个问号,代理路由信息的获取。

RouteSelector

先来看下Route类:

public final class Route {
  final Address address;
  final Proxy proxy;//代理
  final InetSocketAddress inetSocketAddress;//连接目标地址

  public Route(Address address, Proxy proxy, InetSocketAddress inetSocketAddress) {
    ...
    this.address = address;
    this.proxy = proxy;
    this.inetSocketAddress = inetSocketAddress;
  }
复制代码

Route,通过代理服务器信息 proxy、连接目标地址 InetSocketAddress 来描述一条 连接服务器的具体路由

  • proxy代理:可以为客户端显式配置代理服务器。否则,将使用ProxySelector代理选择器。可能会返回多个代理。
  • IP地址:无论是直连还是代理,打开socket连接都需要IP地址。 DNS服务可能返回多个IP地址尝试。

上面分析的findConnection方法中是使用routeSelection.getAll()获取Route集合routes,而routeSelection是通过routeSelector.next()获取,routeSelector是在ExchangeFinder的构造方法内创建的,也就是说routeSelector在RetryAndFollowUpInterceptor中就创建了,那么我们看下RouteSelector:

  RouteSelector(Address address, RouteDatabase routeDatabase, Call call,
      EventListener eventListener) {
    this.address = address;
    this.routeDatabase = routeDatabase;//连接池中的路由黑名单(连接失败的路由)
    this.call = call;
    this.eventListener = eventListener;

    resetNextProxy(address.url(), address.proxy());
  }
  //收集代理服务器
  private void resetNextProxy(HttpUrl url, Proxy proxy) {
    if (proxy != null) {
      // 若指定了代理,那么就这一个。(就是初始化OkhttpClient时配置的)
      proxies = Collections.singletonList(proxy);
    } else {
      //没配置就使用ProxySelector获取代理(若初始化OkhttpClient时没有配置ProxySelector,会使用系统默认的) 
      List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
      proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
          ? Util.immutableList(proxiesOrNull)
          : Util.immutableList(Proxy.NO_PROXY);
    }
    nextProxyIndex = 0;
  }
复制代码

注意到RouteSelector的构造方法中传入了routeDatabase,是连接失败的路由黑名单(后面连接池也会讲到),并使用resetNextProxy方法获取代理服务器列表:若没有指定proxy就是用ProxySelector获取proxy列表(若没有配置ProxySelector会使用系统默认)。接着看next方法:

  //收集代理的路由信息
  public Selection next() throws IOException {
    if (!hasNext()) {//还有下一个代理
      throw new NoSuchElementException();
    }

    List<Route> routes = new ArrayList<>();
    while (hasNextProxy()) {
      Proxy proxy = nextProxy();
      //遍历proxy经DNS后的所有IP地址,组装成Route
      for (int i = 0, size = inetSocketAddresses.size(); i < size; i++) {
        Route route = new Route(address, proxy, inetSocketAddresses.get(i));
        if (routeDatabase.shouldPostpone(route)) {//此路由在黑名单中,存起来最后尝试
          postponedRoutes.add(route);
        } else {
          routes.add(route);
        }
      }

      if (!routes.isEmpty()) {
        break;
      }
    }

    if (routes.isEmpty()) {
      // 若没有拿到路由,就尝试上面存的黑名单的路由
      routes.addAll(postponedRoutes);
      postponedRoutes.clear();
    }
    //routes包装成Selection返回
    return new Selection(routes);
  }
复制代码

next方法主要就是获取下一个代理Proxy的代理信息,即多个路由。具体是在resetNextInetSocketAddress方法中实现,主要是对代理服务地址进行DNS解析获取多个IP地址,这里就不展开了,具体可以参考OkHttp中的代理和路由

好了,到这里 就解决了第二个问号。其他两个问号涉及 连接池RealConnectionPool、连接RealConnection,下面就来瞅瞅。

ConnectionPool

ConnectionPool,即连接池,用于管理http1.1/http2.0连接重用,以减少网络延迟。相同Address的http请求可以共享一个连接,ConnectionPool就是实现了连接的复用。

public final class ConnectionPool {
  final RealConnectionPool delegate;
  //最大空闲连接数5,最大空闲时间5分钟
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }
  
  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.delegate = new RealConnectionPool(maxIdleConnections, keepAliveDuration, timeUnit);
  }
  //返回空闲连接数
  public int idleConnectionCount() {
    return delegate.idleConnectionCount();
  }
  //返回池子中的连接数
  public int connectionCount() {
    return delegate.connectionCount();
  }
  //关闭并移除所以空闲连接
  public void evictAll() {
    delegate.evictAll();
  }
}
复制代码

ConnectionPool看起来比较好理解,默认配置是最大空闲连接数5,最大空闲时间5分钟(即一个连接空闲时间超过5分钟就移除),我们也可以在初始化okhttpClient时进行不同的配置。需要注意的是ConnectionPool是用于应用层,实际管理者是RealConnectionPool。RealConnectionPool是okhttp内部真实管理连接的地方。

连接池对连接的管理无非是 存、取、删,上面的两个问号分别对应 删、取,跟进RealConnectionPool我们一个个看:

  private final Deque<RealConnection> connections = new ArrayDeque<>();
  
  private final Runnable cleanupRunnable = () -> {
    //循环清理
    while (true) {
      //清理
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (RealConnectionPool.this) {
          try {
            //下一次清理之前的等待
            RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
  //存
  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }
复制代码

connections是用于存连接的队列Deque。看到在add之前 使用线程池executor执行了cleanupRunnable,意思是清理连接,为啥要清理呢?上面提到过 连接池有 最大空闲连接数、最大空闲时间的限制,所以不满足时是要进行清理的。并且注意到清理是一个循环,并且下一次清理前要等待waitNanos时间,啥意思呢?我们看下cleanup方法:

  long cleanup(long now) {
    int inUseConnectionCount = 0;//正在使用的连接数
    int idleConnectionCount = 0;//空闲连接数
    RealConnection longestIdleConnection = null;//空闲时间最长的连接
    long longestIdleDurationNs = Long.MIN_VALUE;//最长的空闲时间

    //遍历连接:找到待清理的连接, 找到下一次要清理的时间(还未到最大空闲时间)
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //若连接正在使用,continue,正在使用连接数+1
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
		//空闲连接数+1
        idleConnectionCount++;

        // 赋值最长的空闲时间和对应连接
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }
	  //若最长的空闲时间大于5分钟 或 空闲数 大于5,就移除并关闭这个连接
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // else,就返回 还剩多久到达5分钟,然后wait这个时间再来清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //连接没有空闲的,就5分钟后再尝试清理.
        return keepAliveDurationNs;
      } else {
        // 没有连接,不清理
        cleanupRunning = false;
        return -1;
      }
    }
	//关闭移除的连接
    closeQuietly(longestIdleConnection.socket());

    //关闭移除后 立刻 进行下一次的 尝试清理
    return 0;
  }
复制代码

思路还是很清晰的:

  • 有空闲连接的话,如果最长的空闲时间大于5分钟 或 空闲数 大于5,就移除关闭这个最长空闲连接;如果 空闲数 不大于5 且 最长的空闲时间不大于5分钟,就返回到5分钟的剩余时间,然后等待这个时间再来清理。
  • 没有空闲连接就等5分钟后再尝试清理。
  • 没有连接不清理。

其中判断连接正在使用的方法pruneAndGetAllocationCount我们来看下:

  private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    //连接上的数据流,弱引用列表
    List<Reference<Transmitter>> references = connection.transmitters;
    for (int i = 0; i < references.size(); ) {
      Reference<Transmitter> reference = references.get(i);
      if (reference.get() != null) {
        i++;
        continue;
      }

      // 到这里,transmitter是泄漏的,要移除,且此连接不能再承载新的数据流(泄漏的原因就是下面的message)
      TransmitterReference transmitterRef = (TransmitterReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      Platform.get().logCloseableLeak(message, transmitterRef.callStackTrace);
      references.remove(i);
      connection.noNewExchanges = true;

      //连接因为泄漏没有数据流了,那么可以立即移除了。所以设置 开始空闲时间 是5分钟前(厉害厉害!)
      if (references.isEmpty()) {
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }
    //返回连接上的数据流数量,大于0说明正在使用。
    return references.size();
  }
复制代码

逻辑注释已经标明了,很好理解。其中connection.transmitters,表示在此连接上的数据流,transmitters size大于1即表示多个请求复用此连接。

另外,在findConnection中,使用connectionPool.put(result)存连接后,又调用transmitter.acquireConnectionNoEvents方法,瞅下:

  void acquireConnectionNoEvents(RealConnection connection) {
    assert (Thread.holdsLock(connectionPool));
    if (this.connection != null) throw new IllegalStateException();
    this.connection = connection;
    connection.transmitters.add(new TransmitterReference(this, callStackTrace));
  }
复制代码

先把连接赋给transmitter,表示数据流transmitter依附在这个connection上;然后connection.transmitters add 这个transmitter的弱引用,connection.transmitters表示这个连接承载的所有数据流,即承载的所有请求。

好了,存 讲完了,主要就是把连接存入队列,同时开始循环尝试清理过期连接。

  //为transmitter 从连接池 获取 对应address的连接。若果routes不为空,可能会因为 连接合并(复用) 而获取到HTTP/2连接。
  boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
      @Nullable List<Route> routes, boolean requireMultiplexed) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (requireMultiplexed && !connection.isMultiplexed()) continue;
      if (!connection.isEligible(address, routes)) continue;
      transmitter.acquireConnectionNoEvents(connection);
      return true;
    }
    return false;
  }
复制代码

存的方法名是put,但你发现 取 的方法名却不是get,transmitterAcquirePooledConnection意思是 为transmitter 从连接池 获取连接,实际上transmitter就代表一个数据流,也就是一个http请求。注意到,在遍历中 经过判断后也是transmitter的acquireConnectionNoEvents方法,即把匹配到的connection赋给transmitter。所以方法名还是很生动的。

继续看是如何匹配的:如果requireMultiplexed为false,即不是多路复用(不是http/2),那么就要看Connection的isEligible方法了,isEligible方法返回true,就代表匹配成功:

  //用于判断 连接 是否 可以承载指向address的数据流
  boolean isEligible(Address address, @Nullable List<Route> routes) {
    //连接不再接受新的数据流,false
    if (transmitters.size() >= allocationLimit || noNewExchanges) return false;

    //匹配address中非host的部分
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    //匹配address的host,到这里也匹配的话,就return true
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    //到这里hostname是没匹配的,但是还是有机会返回true:连接合并
    // 1. 连接须是 HTTP/2.
    if (http2Connection == null) return false;

    // 2. IP 地址匹配
    if (routes == null || !routeMatchesAny(routes)) return false;

    // 3. 证书匹配
    if (address.hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. 证书 pinning 匹配.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

  private boolean routeMatchesAny(List<Route> candidates) {
    for (int i = 0, size = candidates.size(); i < size; i++) {
      Route candidate = candidates.get(i);
      if (candidate.proxy().type() == Proxy.Type.DIRECT
          && route.proxy().type() == Proxy.Type.DIRECT
          && route.socketAddress().equals(candidate.socketAddress())) {
        return true;
      }
    }
    return false;
  }
复制代码

取的过程就是 遍历连接池,进行地址等一系列匹配,到这里第三个问号也解决了。

  //移除关闭空闲连接
  public void evictAll() {
    List<RealConnection> evictedConnections = new ArrayList<>();
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        if (connection.transmitters.isEmpty()) {
          connection.noNewExchanges = true;
          evictedConnections.add(connection);
          i.remove();
        }
      }
    }

    for (RealConnection connection : evictedConnections) {
      closeQuietly(connection.socket());
    }
  }
复制代码

遍历连接池,如果连接上的数据流是空,那么就从连接池移除并且关闭。

我们回过头看下Transmitter的releaseConnectionNoEvents方法,也就第一个问号,如果连接不再接受新的数据流,就会调用这个方法:

  //从连接上移除transmitter.
  @Nullable Socket releaseConnectionNoEvents() {
    assert (Thread.holdsLock(connectionPool));

    int index = -1;
    //遍历 此数据流依附的连接 上的所有数据流,找到index
    for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
      Reference<Transmitter> reference = this.connection.transmitters.get(i);
      if (reference.get() == this) {
        index = i;
        break;
      }
    }

    if (index == -1) throw new IllegalStateException();
	//transmitters移除此数据流
    RealConnection released = this.connection;
    released.transmitters.remove(index);
    this.connection = null;
	//如果连接上没有有数据流了,就置为空闲(等待清理),并返回待关闭的socket
    if (released.transmitters.isEmpty()) {
      released.idleAtNanos = System.nanoTime();
      if (connectionPool.connectionBecameIdle(released)) {
        return released.socket();
      }
    }

    return null;
  }
复制代码

主要就是尝试释放连接,连接上没有数据流就关闭socket等待被清理。

好了,到这里连接池的管理就分析完了。

从连接的查找 到 连接池的管理,就是ConnectInterceptor的内容了。

CallServerInterceptor

哎呀,终于到最后一个拦截器了!

请求服务拦截器,也就是真正地去进行网络IO读写了——写入http请求的header和body数据、读取响应的header和body。

上面ConnectInterceptor主要介绍了如何 寻找连接 以及 连接池如何管理连接。在获取到连接后,调用了RealConnection的newCodec方法ExchangeCodec实例,然后使用ExchangeCodec实例创建了Exchange实例传入CallServerInterceptor了。上面提到过ExchangeCodec负责请求和响应的IO读写,我们先来看看ExchangeCodec创建过程——RealConnection的newCodec方法:

  ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
    if (http2Connection != null) {
      return new Http2ExchangeCodec(client, this, chain, http2Connection);
    } else {
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1ExchangeCodec(client, this, source, sink);
    }
  }
复制代码

http2Connection不为空就创建Http2ExchangeCodec,否则是Http1ExchangeCodec。而http2Connection的创建是连接进行TCP、TLS握手的时候,即在RealConnection的connect方法中,具体就是connect方法中调用的establishProtocol方法:

  private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    //针对http请求,如果配置的协议包含Protocol.H2_PRIOR_KNOWLEDGE,则开启Http2连接
    if (route.address().sslSocketFactory() == null) {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        socket = rawSocket;
        protocol = Protocol.H2_PRIOR_KNOWLEDGE;
        startHttp2(pingIntervalMillis);
        return;
      }

      socket = rawSocket;
      protocol = Protocol.HTTP_1_1;
      return;
    }
	//针对https请求,会在TLS握手后,根据平台获取协议(),如果协议是Protocol.HTTP_2,则开启Http2连接
    eventListener.secureConnectStart(call);
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);

    if (protocol == Protocol.HTTP_2) {
      startHttp2(pingIntervalMillis);
    }
  }

  private void startHttp2(int pingIntervalMillis) throws IOException {
    socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
    http2Connection = new Http2Connection.Builder(true)
        .socket(socket, route.address().url().host(), source, sink)
        .listener(this)
        .pingIntervalMillis(pingIntervalMillis)
        .build();
    http2Connection.start();
  }
复制代码

好了,到这里不再深入了,继续了解可以参考HTTP 2.0与OkHttp。那么到这里,ExchangeCodec已经创建了,然后又包装成Exchange,最后传入了CallServerInterceptor。

下面就来看看这最后一个拦截器:

public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();//上个拦截器传入的exchange
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
	//写请求头
    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    //含body的请求
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 若请求头包含 "Expect: 100-continue" , 就会等服务端返回含有 "HTTP/1.1 100 Continue"的响应,然后再发送请求body. 
      //如果没有收到这个响应(例如收到的响应是4xx),那就不发送body了。
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }
	  //responseBuilder为null说明服务端返回了100,也就是可以继续发送body了
      if (responseBuilder == null) {
        if (request.body().isDuplex()) {//默认是false不会进入
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // 满足了 "Expect: 100-continue" ,写请求body
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
       //没有满足 "Expect: 100-continue" ,请求发送结束
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
     //没有body,请求发送结束
      exchange.noRequestBody();
    }

	//请求发送结束
    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }
	//回调 读响应头开始事件(如果上面没有)
    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }
	//读响应头(如果上面没有)
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }
	//构建response
    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      //这里服务端又返回了个100,就再尝试获取真正的响应()
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }
	//回调读响应头结束
    exchange.responseHeadersEnd(response);
	//这里就是获取响应body了
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }
	//请求头中Connection是close,表示请求完成后要关闭连接
    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }
	//204(无内容)、205(充值内容),body应该是空
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
	
    return response;
  }
}
复制代码

你会发现,整个内容就是前面说的一句话:写入http请求的header和body、读取响应的header和body。这里就不再解释了。

这里我们可以看到,无论写请求还是读响应,都是使用Exchange对应的方法。上面也提到过Exchange理解上是对ExchangeCodec的包装,这写方法内部除了事件回调和一些参数获取外,核心工作都由 ExchangeCodec 对象完成,而 ExchangeCodec实际上利用的是 Okio,而 Okio 实际上还是用的 Socket。

ExchangeCodec的实现类 有对应Http1.1的Http1ExchangeCodec 和 对应Http2.0的Http2ExchangeCodec。其中Http2ExchangeCodec是使用Http2.0中 数据帧 的概念完成请求响应的读写。关于Http1ExchangeCodec、Http2ExchangeCodec具体实现原理涉及okio这不再展开。

最后一点,CallServerInterceptor的intercept方法中没有调用连接器链Chain的proceed方法,因为这是最后一个拦截器啦!

好了,到这里最后一个拦截器也分析完啦!

总结

本篇分析了ConnectInterceptor、CallServerInterceptor两个拦截器的作用和原理。ConnectInterceptor负责连接的获取,其中涉及到连接池的概念;CallServerInterceptor是真正的网络IO读写。ConnectInterceptor涉及的内容较多,它是Okhttp的核心。 结合上一篇,我们已经分析完了Okhttp内部所有的拦截器,最后给出Okhttp的整体架构图(图片来源):

okhttp

到这里,Okhttp源码解析部分就真的结束了,可真是一个漫长的过程! 从使用方式到工作流程,再到具体拦截器,掌握了这四篇文章的内容,应该说得上对Okhttp是比较熟悉了。这里还计划会出第五篇终章,来介绍一些Okhttp的常见问题和高级使用方式,敬请期待!


感谢与参考:

okhttp源码解析

通过ConnectInterceptor源码掌握OKHttp3网络连接原理 呕心沥血第十弹【十】

OkHttp源码深度解析

OkHttp源码解析 (三)——代理和路由

OkHttp 源码学习笔记(三) 数据交换的流 HTTPCodec

最后的最后,欢迎留言讨论,如果你喜欢这一系列,或者觉得写得还不错,请帮忙 点赞、收藏和转发,感谢

欢迎关注我的 公 众 号

公众号:胡飞洋