阅读 2918

Android每周一轮子:OkHttp

前言

Okhttp

前两篇的文章讲解了Volley,HttpURLConnection,今天是对于OKHttp的分析,分析完成将会分析OKIO和retrofit,试图通过这一系列分析,来对Android的网络库的实现有足够充分的了解,当然这里的分析完全是脱离了对于项目的具体针对性实践,因此在一些细节上会有说欠缺,这也将会是接下来源码分析的下一步,从项目中的应用和实践优化作为出发点。

基础使用

  • 创建OKHttpClient和构造请求
OkHttpClient client = new OkHttpClient.Builder()
        .readTimeout(30, TimeUnit.SECONDS)
        .build();

Request request = new Request.Builder()
        .header("User-Agent", "OkHttp Headers.java")
        .url("http://www.baidu.com")
        .build();
复制代码
  • 发起异步请求
client.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
    }

    @Override
    public void onResponse(Call call, okhttp3.Response response) throws IOException {
    }
});
复制代码
  • 发起同步请求
okhttp3.Response response = client.newCall(request).execute();
复制代码

OkHttp的使用首先通过OkHttp的生成器来根据我们自己的配置创建一个OKHttpClient,然后构造一个请求,设置请求的header,url,请求的body,OkHttp提供了同步和异步两种请求执行方式。这里可以根据自己的需求选择一个合适的方式。

实现分析

按照Android每周一轮子的写作风格,基础使用作为一个引子,帮助我们迅速的切入框架的执行流程,快速的理清整个调用链路,了解该框架的实现,此处我们还是延续这种方式。针对上面的使用流程,逐步分析。

  • newCall
public Call newCall(Request request) {
  return RealCall.newRealCall(this, request, false /* for web socket */);
}
复制代码
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  RealCall call = new RealCall(client, originalRequest, forWebSocket);
  call.eventListener = client.eventListenerFactory().create(call);
  return call;
}
复制代码

根据创建的请求,构造一个RealCall实例,同时为该RealCall对象设置事件监听器。在EventListener中,定义了许多的函数,可以监控到网络请求的整个生命周期,包括DNS开始查找,DNS查找结束,连接的开始建立,连接建立失败等等。

  • enqueue

在连接建立之后,在异步请求中,将会调用enqueue方法。

public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
复制代码

在该方法中,会调用OkHttpClient的Dispatcher的enqueue方法。在创建OkHttpClient的时候,但开发者未设定Dispatcher时,会默认创建一个Dispatcher。这里按照默认的实现代码进行分析。

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);
    executorService().execute(call);
  } else {
    readyAsyncCalls.add(call);
  }
}
复制代码

加入到Dispatchr之中之后,对于请求进行判断,判断是否超过了最大请求数目,是否超过了单个host共享下的请求数目,如果超过了则将其加入到准备执行队列之中。如果请求数目过多的时候,将其放置在一个准备队列之中。对于ArrayDeque的数据结构解释文末。网络请求执行,是通过线程池来实现的,对于线程池的创建和具体的执行问题,将在文末具体分析。

在Dispatcher中用来管理请求的数据结构,分别为正在执行的异步请求队列,正在准备的异步请求队列,正在执行的同步请求队列。

Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
复制代码
  • execute

对于请求的具体执行过程,在AsyncCall的execute方法中。

final class AsyncCall extends NamedRunnable {

  @Override protected void execute() {
    boolean signalledCallback = false;
    try {
      Response response = getResponseWithInterceptorChain();
         .....
    } catch (IOException e) {
      ......
    } finally {
      client.dispatcher().finished(this);
    }
  }
}
复制代码

这里首先获得响应结果,在获得响应结果的时候,可能会出现一些异常情况,这里会catch到异常,会回调事件监听器的一些回调函数。对于获取请求响应结果的核心调用就是getResponseWithInterceptorChain通过层层责任链的执行来获得最终的请求结果。

  • getResponseWithInterceptorChain
Response getResponseWithInterceptorChain() throws IOException {
  List<Interceptor> interceptors = new ArrayList<>();
  interceptors.addAll(client.interceptors());
  interceptors.add(retryAndFollowUpInterceptor);
  interceptors.add(new BridgeInterceptor(client.cookieJar()));
  interceptors.add(new CacheInterceptor(client.internalCache()));
  interceptors.add(new ConnectInterceptor(client));
  if (!forWebSocket) {
    interceptors.addAll(client.networkInterceptors());
  }
  interceptors.add(new CallServerInterceptor(forWebSocket));

  Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
      originalRequest, this, eventListener, client.connectTimeoutMillis(),
      client.readTimeoutMillis(), client.writeTimeoutMillis());

  return chain.proceed(originalRequest);
}
复制代码

构建一个Interceptor列表,在其中添加用户设置的拦截器,框架自身的缓存,网络连接等等连接器,然后根据这一系列的拦截器构建出一个Interceptor.Chain实例对象,之后调用其processed方法。

  • 责任链的执行(proceed)

proceed的方法是网络请求执行的核心

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
  if (index >= interceptors.size()) throw new AssertionError();

  calls++;

  //如果已经有存在的流,确定进入的请求可以使用它
  if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must retain the same host and port");
  }

 
  if (this.httpCodec != null && calls > 1) {
    throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
        + " must call proceed() exactly once");
  }

  // 调用该链中的下一个拦截器
  RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
      connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
      writeTimeout);
  Interceptor interceptor = interceptors.get(index);
  Response response = interceptor.intercept(next);

//确认拦截器是否调用了chain的proceed方法。
  if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
    throw new IllegalStateException("network interceptor " + interceptor
        + " must call proceed() exactly once");
  }

  // 判断响应是否为空,为空抛出异常
  if (response == null) {
    throw new NullPointerException("interceptor " + interceptor + " returned null");
  }

  //判断响应体是否为空,抛出异常返回内容体为空
  if (response.body() == null) {
    throw new IllegalStateException(
        "interceptor " + interceptor + " returned a response with no body");
  }

  return response;
}
复制代码

责任链的处理函数执行的操作大致为对于一些状态进行判断,然后取其中的拦截器构造一个拦截链实例,然后执行拦截器的拦截方法,在其拦截方法中还会继续调用创建的新的RealChain方法的proceed方法,通过这种递归的方式来将数据进行层层包装处理,最终将数据丢回。对于其中的每一个拦截器在完成整个网络请求的过程发挥了关键的作用。接下来从第一个拦截器开始逐层次进行分析。

RetryAndFollowUpInterceptor

这个拦截器可以将其从失败和有必要的重定向中恢复。多少重定向和授权需要尝试,Chrome会跟随21次重定向,火狐,curl,wget会跟随20次,Safari会跟随16次,Http1.0推荐5次,这里则为20次。

通过一个While true死循环来进行重试操作,在这里建立StreamAllocation,然后创建新的拦截器链实例,然后调用其processed方法,等待返回结果回来。该拦截器在最外层,接下来的拦截器返回的响应结果,最终都会返回到这里被处理。

  @Override public Response intercept(Chain chain) throws IOException {

    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
           .....
      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
     
      //从失败的路由中恢复
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
      
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
        

      Request followUp = followUpRequest(response, streamAllocation.route());

      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }

      closeQuietly(response.body());
      //跟随次数判断
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }
复制代码

这里通过异常捕捉的方式,根据相应的异常出错,采取相应的恢复方式,同时记录相应的出错状态,但达到阀值之后,停止进行拉起重试操作。此次网络请求失败。

BridgeInterceptor

应用代码和网络代码之间的桥梁,用来根据用户的一个请求构建一个网络请求,最后,根据网络响应来构建一个用户响应。根据设置的一些头部参数进行相应的处理。比如GZIP压缩问题等等,根据传递参数构造RequestHeader,RequestBody。

public Response intercept(Chain chain) throws IOException {

  //请求体的构建
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder();

  RequestBody body = userRequest.body();
  if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString());
    }

    long contentLength = body.contentLength();
    if (contentLength != -1) {
      requestBuilder.header("Content-Length", Long.toString(contentLength));
      requestBuilder.removeHeader("Transfer-Encoding");
    } else {
      requestBuilder.header("Transfer-Encoding", "chunked");
      requestBuilder.removeHeader("Content-Length");
    }
  }

  if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
  }

  if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
  }

//Gzip 压缩转换
  boolean transparentGzip = false;
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
  }

  List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
  if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
  }

  if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
  }
  
//获取响应结果,对响应结果进行包装
  Response networkResponse = chain.proceed(requestBuilder.build());

  HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);

  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
  }

  return responseBuilder.build();
}
复制代码

作为应用层和网络层的桥梁,其主要目的是在请求到达网络层时,对网络请求进行包装和在网络请求的响应结果回来时,对响应结果进行包装,转到用户层。

CacheInterceptor

用来检测缓存中是否有数据,有检测无变化返回,否则网络请求后存放。

public Response intercept(Chain chain) throws IOException {
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  long now = System.currentTimeMillis();
  //获取请求的cache策略
  CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
  Request networkRequest = strategy.networkRequest;
  Response cacheResponse = strategy.cacheResponse;

  if (cache != null) {
    cache.trackResponse(strategy);
  }

  if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); 
  }

  // 如果我们既无法进行网络请求又无缓存,返回504错误
  if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
        .request(chain.request())
        .protocol(Protocol.HTTP_1_1)
        .code(504)
        .message("Unsatisfiable Request (only-if-cached)")
        .body(Util.EMPTY_RESPONSE)
        .sentRequestAtMillis(-1L)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
  }

  // 如果我们不需要网络请求,执行完成返回结果
  if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }

//执行责任链,根据网络请求获取响应结果
  Response networkResponse = null;
  try {
    networkResponse = chain.proceed(networkRequest);
  } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
      closeQuietly(cacheCandidate.body());
    }
  }

  // 获取请求响应后,根据cache状态,对cache内容进行相应的处理
  if (cacheResponse != null) {
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
    } else {
      closeQuietly(cacheResponse.body());
    }
  }

//构造一个响应体
  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) {
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
      // Offer this request to the cache.
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    }

    if (HttpMethod.invalidatesCache(networkRequest.method())) {
      try {
        cache.remove(networkRequest);
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
    }
  }

  return response;
}
复制代码

ConnectInterceptor

根据传递的数据,寻找并建立一个健康的连接,同时也会根据设置的一些超时时间等信息设置给响应的连接。再寻找该连接时候如果没有与目标主机建立连接,其将会与目标主机建立连接。

public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  Transmitter transmitter = realChain.transmitter();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

  return realChain.proceed(request, transmitter, exchange);
}
复制代码

这里的核心是在于Transmitter类,其作为网络和应用的一个中间层,在创建RealCall的时候,就创建了该对象,接下来,我们看一下其newExchange方法做了什么?

Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
  synchronized (connectionPool) {
    if (noMoreExchanges) {
      throw new IllegalStateException("released");
    }
    if (exchange != null) {
      throw new IllegalStateException("cannot make a new request because the previous response "
          + "is still open: please call response.close()");
    }
  }

  ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
  Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);

  synchronized (connectionPool) {
    this.exchange = result;
    this.exchangeRequestDone = false;
    this.exchangeResponseDone = false;
    return result;
  }
}
复制代码

Exchange是一个可以携带新的请求和响应的,首先通过ExchangeFinder来去查找。

RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
  writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
复制代码

在ExchangeFinder中的核心实现是调用了findHealthyConnection。

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
    boolean doExtensiveHealthChecks) throws IOException {
  while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
        pingIntervalMillis, connectionRetryEnabled);

    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
      if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
        return candidate;
      }
    }

    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
      candidate.noNewExchanges();
      continue;
    }

    return candidate;
  }
}
复制代码

该方法会通过一个死循环来不断的调用findConnecton来找到最终一个合适的连接。findConnection是连接复用与连接建立的核心实现。

if (address.url().host().equals(this.route().address().url().host())) {
  return true; // This connection is a perfect match.
}
复制代码

会首先从连接池中进行查找,进行host的比对,如果有比对成功的,怎返回当前的连接,如果则会尝试其它的route的ip,最终还是没有找到的话,则会尝试重新建立一个新的连接。

result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
    connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());

Socket socket = null;
synchronized (connectionPool) {
  connectingConnection = null;
  // Last attempt at connection coalescing, which only occurs if we attempted multiple
  // concurrent connections to the same host.
  if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
    // We lost the race! Close the connection we created and return the pooled connection.
    result.noNewExchanges = true;
    socket = result.socket();
    result = transmitter.connection;
    nextRouteToTry = selectedRoute;
  } else {
    connectionPool.put(result);
    transmitter.acquireConnectionNoEvents(result);
  }
}
复制代码

然后将建立的新的连接加入到连接池进行复用,Connection建立好之后,会调用其connect方法来进行连接的建立,首先判断是否需要隧道,如果不需要则直接建立Socket连接。Socket连接建立好之后,调用establishProtocol,上面socket只是完成了TCP层面的连接建立,后续进行协议层的处理,同时每一个步骤都通过eventListner进行相应的事件回调。

if (route.requiresTunnel()) {
  connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
  if (rawSocket == null) {
    // We were unable to connect the tunnel but properly closed down our resources.
    break;
  }
} else {
  connectSocket(connectTimeout, readTimeout, call, eventListener);
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
}
复制代码

Socket连接建立之后,其后续的都可以通过相应的包装向socket中进行数据的写入,对于Socket通过Okio进行包装。

CallServerInterceptor

这个拦截是拦截链中的最后一个,数据的写入过程,也就是发起网络请求和Server进行交互的过程,然后返回请求数据,这个时候再是层层的返回调用栈,将数据倒回去,然后进行层层的处理。

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Exchange exchange = realChain.exchange();
  Request request = realChain.request();

  long sentRequestMillis = System.currentTimeMillis();
  //写入请求体的头部
  exchange.writeRequestHeaders(request);
  
  //写入请求体
    BufferedSink bufferedRequestBody = Okio.buffer(
    exchange.createRequestBody(request, false));
    request.body().writeTo(bufferedRequestBody);
    bufferedRequestBody.close();
   
    //从请求体中读出数据
    response = response.newBuilder()
        .body(exchange.openResponseBody(response))
        .build();

  return response;
}
复制代码

这个部分我们核心需要关注的就是我们的请求数据是如何写入到Socket的,服务器返回的数据是如何被我们读出来的。在连接的部分,我们已经知道建立一个连接之后,会通过Okio的包装一个InputStream还有一个OutputStream,也就是Okio的Source和Sink。上述是一个缩减代码,省略了相关一些逻辑的实现,这里主要表现对于请求头的写入,请求体的写入,然后从返回的数据中获取数据。其操作本质上是对于Socket包装层的一个写入流和读出流的包装。

OKHttp实现原理分析

相关类

  • Transmitter

处在OkHttp应用层和网络层的中间,可以用来进行流的取消,可以取消流,但是不会影响共享连接池中的其它流。其中维护了ConnectionPool,Call,RealConnection。

  • ConnectionPool

为了减少延迟,通过connectionPool来实现对于相同地址的连接的重用。其内部通过ArrayDeque维护了多个RealConnection。

  • Dispatcher

每一个调度器内部都有一个线程池来进行任务的执行调度。

  • RealCall

实际的请求执行

  • ExchangeCodec

对于Http请求的包装和对于Http请求返回结果的包装。其中包含了一个Socket连接用来进行http1的数据传输,同时也包含了请求,同时封装了对于Socket的读写操作。

  • RealConnection

在建立Socket连接的时候,会通过OKIO来包装Socket的,具备一个Sink,Source,

小结

本篇文章算不上对于源码的深度剖析,大概还是停留在表层的代码逻辑调用,对于其中用到的设计模式,各种技术,和其相比于其它网络库的优势所在,这里暂时都没有做分析,由于本周时间比较近,所以对于性能,优势特征,将会在接下来的一篇深度展开分析。同时下一篇也将会作为对OkIO分析的一个引子。