解刨OkHttp之访问连接

1,833 阅读6分钟
原文链接: www.jianshu.com

因为OkHttp能讲的东西太多了,上一篇文章只是讲到了他的设计架构即责任链模式和异步多线程网络访问,这对于OkHttp只是冰山一角,对于一个网络请求框架,最重要的就是网络访问了,为此我们来说一下Okttp网络访问的一些细节。

这个访问分为两个部分,一个部分是与服务器形成连接,另一个部分是与服务器进行交互。与服务器连接的是ConnectInterceptor拦截器,而与服务器交互的是CallServerInterceptor拦截器。我们就来讲一下这两个拦截器吧。

ConnectInterceptor

先看源码:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

这里看起来很简单,就是给首先StreamAllocation赋值,然后调用newStream()方法,那streamAllocation是什么东西呢?它是整个连接的中心,协调着几个重要的类。后面都会说。 我们看一下newStream()方法:

public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
}

通过看代码我们有追溯findHealthyConnection()方法,源码如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
}
  
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) {
        route = selectedRoute;
        return connection;
      }

      route = selectedRoute;
      refusedStreamCount = 0;
      result = new RealConnection(connectionPool, selectedRoute);
      acquire(result);
    }

    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      Internal.instance.put(connectionPool, result);

      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
}
  

在findHealthyConnection()里通过while(true)不断调用findConnection()去获取健康可用的RealConnection。RealConnection是与服务器连接的一个socket的连接,有和这个就可以进行三次握手的tcp连接,所以上面的result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);这一行很关键,就是socket连接服务器的关键代码,具体里面的代码如下:

public void connect(
      int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
   //省略代码

    while (true) {
      try {
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout);
        } else {
          connectSocket(connectTimeout, readTimeout);
        }
        establishProtocol(connectionSpecSelector);
        break;
      } catch (IOException e) {
         //省略代码
      }
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
      throws IOException {
    Request tunnelRequest = createTunnelRequest();
    HttpUrl url = tunnelRequest.url();
    int attemptedConnections = 0;
    int maxAttempts = 21;
    while (true) {
      if (++attemptedConnections > maxAttempts) {
        throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
      }

      connectSocket(connectTimeout, readTimeout);
      tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);

      if (tunnelRequest == null) break; 

      closeQuietly(rawSocket);
      rawSocket = null;
      sink = null;
      source = null;
    }
  }

private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
}    

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }

    connectTls(connectionSpecSelector);

    if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .build();
      http2Connection.start();
    }
}

不管是直接调用connectSocket()还是connectTunnel(),最终都会调connectSocket()方法然后通过 Platform.get().connectSocket()(rawSocket, route.socketAddress(), connectTimeout);进行Socket连接。Platform.get().connectSocket()对应代码如下:

 public void connectSocket(Socket socket, InetSocketAddress address,
      int connectTimeout) throws IOException {
    socket.connect(address, connectTimeout);
  }

然后有调用了establishProtocol(),起始最主要的就是初始化Http2Connection对象.接着返回StreamAllocation,下一步就是HttpCodec resultCodec = resultConnection.newCodec(client, this);newCodec()源码如下:

public HttpCodec newCodec(OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
    if (http2Connection != null) {
      return new Http2Codec(client, streamAllocation, http2Connection);
    } else {
      socket.setSoTimeout(client.readTimeoutMillis());
      source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
}

其实没什么,也就是初始化Http1Codec或Http2Codec对象,这两个类都集成接口类HttpCodec,典型的面向接口编程,我们看一下接口类是什么:

public interface HttpCodec {
  int DISCARD_STREAM_TIMEOUT_MILLIS = 100;
  //写入请求体
  Sink createRequestBody(Request request, long contentLength);
  //写入请求头    
  void writeRequestHeaders(Request request) throws IOException;
  //相当于flush,把请求刷入底层socket
  void flushRequest() throws IOException;
  //相当于flush,把请求输入底层socket并不在发出请求
  void finishRequest() throws IOException;
  //读取响应头
  Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
  //读取响应体
  ResponseBody openResponseBody(Response response) throws IOException;
  //取消请求
  void cancel();
}

有方法知道HttpCodec是网络读写的管理类,而Http1Codec和Http2Codec分别对应Http1和Http2,在后面的CallServerInterceptor就主要用这个类进行操作。最后ConnectInterceptor的RealConnection connection = streamAllocation.connection();只是获取了前面生成的RealConnection,然后通过前一篇介绍的责任链模式传给CallServerInterceptor。

CallServerInterceptor

前面的ConnectInterceptor只是socket连接了服务器,而连接后怎么操作就是CallServerInterceptor了,接着我们看一下其实现方法:

@Override 
public Response intercept(Chain chain) throws IOException {
    //省略代码。。。
    
    //写入请求头
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      //写入请求体     
      if (responseBuilder == null) {
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();
    //读取响应头
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    //读取响应体
    int code = response.code();
    if (forWebSocket && code == 101) {
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}

这里有一个东西需要讲,就是Socket连接了服务器之后,是通过Okio向服务器发送请求的。再次列取writeRequestHeaders()方法,源码如下:

@Override
public void writeRequestHeaders(Request request) throws IOException {
    String requestLine = RequestLine.get(
        request, streamAllocation.connection().route().proxy().type());
    writeRequest(request.headers(), requestLine);
}

public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
}

而sink就是在ConnectInterceptor已经初始化完成了,就在上面的connectSocket()连接服务器的方法里

  source = Okio.buffer(Okio.source(rawSocket));
  sink = Okio.buffer(Okio.sink(rawSocket));

其中sink是向服务器写数据,而source是获取服务器数据,rawSocket就是我们与服务器保持连接socker,okio我只会点到为止,不然又要开新的一篇讲解了。createRequestBody()的原理和writeRequestHeaders()是一样的。
接着就是接收数据了,读取响应头readResponseHeaders()和上面原理一样的,值得讲的是读取响应体
httpCodec.openResponseBody(response),里面的源码如下:

@Override 
public ResponseBody openResponseBody(Response response) throws IOException {
    Source source = getTransferStream(response);
    return new RealResponseBody(response.headers(), Okio.buffer(source));
}

private Source getTransferStream(Response response) throws IOException {
    if (!HttpHeaders.hasBody(response)) {
      return newFixedLengthSource(0);
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      return newChunkedSource(response.request().url());
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      return newFixedLengthSource(contentLength);
    }

    return newUnknownLengthSource();
}

其实没有做任何的读取操作,只是ResponseBody封装了headers()获取响应头和Source对象,而Source就可以获取响应体,只是没有马上获取而是封装好传递给上一个拦截器。最后在哪里获取响应体呢, 回到上一篇刚开始最简单的访问网络demo

 Request request = new Request.Builder().url(url).build();
 Response response = client.newCall(request).execute();
 return response.body().string();

我们看一下body().toString()这个方法

public @Nullable ResponseBody body() {
    return body;
}

public final String string() throws IOException {
    BufferedSource source = source();
    try {
      Charset charset = Util.bomAwareCharset(source, charset());
      return source.readString(charset);
    } finally {
      Util.closeQuietly(source);
    }
}   

source就是Okio的读对象对应上面的source = Okio.buffer(Okio.source(rawSocket));,bomAwareCharset是获取字符类型,默认utf-8,调用source.readString(charset);就可以获取他的请求体了,也就是请求内容字符串。closeQuietly()最终这个读对象,整个访问流程也基本结束了。

内容有点多,自身感觉讲解的也仅讲了最主要的部分,很多东西还可以扩展却因为篇幅没说,请见谅。