OkHttp 源码分析(三):连接机制

551 阅读6分钟

前面两篇文章分别介绍了 OkHttp 的请求流程和缓存机制,最后这篇文章介绍 OkHttp 的连接机制,作为 OkHttp 源码分析的收尾。

建议将 OkHttp 的源码下载下来,使用 IDEA 编辑器可以直接打开阅读。我这边也将最新版的源码下载下来,进行了注释说明,有需要的可以直接从 Android open framework analysis 下载查看。

创建连接

OkHttp 连接的创建是通过 StreamAllocation 对象统筹完成。

它主要用来管理两个角色:

  • RealConnection:真正建立连接的对象,利用 Socket 建立连接。
  • ConnectionPool:连接池,用来管理和复用连接。

StreamAllocation 是在 RetryAndFollowUpInterceptor 中被创建,此时并未发起连接。

// RetryAndFollowUpInterceptor .intercept()

StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);

真正的连接是在处理完 Header 和缓存之后,调用 ConnectInterceptor 进行的。

// ConnectInterceptor.intercept()
    
StreamAllocation streamAllocation = realChain.streamAllocation();

// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();

这里创建了两个对象:

  • HttpCodec:用来编码 http request 和解码 http response
  • RealConnection:上文介绍了。

调用 streamAllocation 的 newStream 方法经过一系列判断最终会走到 findConnection 方法

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");


      // 1、尝试使用已分配的连接
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // 当前连接可用.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }

      // 2、尝试从连接池中获取一个连接
      if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.acquire(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // 如果从连接池中获取到了一个连接,就将其返回.
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }


    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        // 根据一系列的 IP地址从连接池中获取一个链接
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          // 从连接池中获取一个连接
          Internal.instance.acquire(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      // 3、如果连接池中没有可用连接,则创建一个
      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    //4、 开始TCP以及TLS握手操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // 5、将新创建的连接,放在连接池中.
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

整个流程是:

  • 1、判断当前的连接是否可以使用:输入输出流没有关闭,Socket 未关闭等
  • 2、如果当前连接不可用,尝试从连接池中获取一个可用连接
  • 3、如果连接池中没有可用连接,则创建一个连接
  • 4、开始 TCP 连接以及 TLS 握手操作
  • 5、将新创建的连接加入到连接池中

连接池

网络请求时频繁地进行 Socket 连接和断开 Socket 非常消耗网络资源和浪费时间,连接复用可以提升网络访问的效率。这里就引入了连接池的概念。

OKHttp 的连接池由 ConnectionPool 实现。

ConnetionPool 内部维护了一个线程池,负责清理无效的连接。

public final class ConnectionPool {
   private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
   void put(RealConnection connection) {
    assert (Thread.holdsLock(this));

    if (!cleanupRunning) {
      cleanupRunning = true;
      // 使用线程池执行清理任务
      executor.execute(cleanupRunnable);
    }
    // 将新建的连接插入到双端队列中
    connections.add(connection);
   }
    
   private final Runnable cleanupRunnable = () -> {
    while (true) {
      // 清理操作,返回下次需要清理的时间
      long waitNanos = cleanup(System.nanoTime());
      if (waitNanos == -1) return;
      if (waitNanos > 0) {
        long waitMillis = waitNanos / 1000000L;
        waitNanos -= (waitMillis * 1000000L);
        synchronized (ConnectionPool.this) {
          try {
            ConnectionPool.this.wait(waitMillis, (int) waitNanos);
          } catch (InterruptedException ignored) {
          }
        }
      }
    }
  };
}

ConnectionPool 维护一个线程池用于清理无效的连接,清理任务由 cleanup方法完成,首先执行清理,返回下次需要清理的间隔时间,然后调用 wait 方法释放锁。等到了时间,再次进行清理操作,返回下一次清理的时间,循环往复下去。

具体看一下 cleanup 方法:

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
        // 遍历所有的连接
        for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();

            // 1、连接正在使用,即StreanAllocation的引用数量大于0
            if (pruneAndGetAllocationCount(connection, now) > 0) {
                inUseConnectionCount++;
                continue;
            }

            idleConnectionCount++;

            // If the connection is ready to be evicted, we're done.
            // 2、如果找到了一个可以被清理的连接,会尝试去寻找闲置时间最久的连接来释放
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
                longestIdleDurationNs = idleDurationNs;
                longestIdleConnection = connection;
            }
        }

        // maxIdleConnections 表示最大允许的闲置的连接的数量,keepAliveDurationNs表示连接允许存活的最长的时间。
        // 默认空闲连接最大数目为5个,keepalive 时间最长为5分钟
        // 3、如果空闲连接超过5个或者keepalive时间大于5分钟,则将该连接清理
        if (longestIdleDurationNs >= this.keepAliveDurationNs
            || idleConnectionCount > this.maxIdleConnections) {
            connections.remove(longestIdleConnection);
        } else if (idleConnectionCount > 0) {
            // 4、闲置的连接的数量大于0,停顿指定的时间(等会儿会将其清理掉,现在还不是时候)
            return keepAliveDurationNs - longestIdleDurationNs;
        } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            ///5、所有的连接都在使用中,5分钟后再清理
            return keepAliveDurationNs;
        } else {
            // No connections, idle or in use.
            //6、没有连接
            cleanupRunning = false;
            return -1;
        }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
}

整体流程如下:

  • 1、遍历所有连接,查询每个连接的内部的 StreamAllocation 的引用数量,如果大于 0,表示连接正在使用,无需清理,执行下一次循环。
  • 2、如果找到了一个可以被清理的连接,会尝试去寻找闲置时间最久的连接来释放。
  • 3、如果空闲连接超过 5 个或者 keepalive 时间大于 5 分钟,则将该连接清理。
  • 4、闲置的连接的数量大于 0,返回该连接的到期时间(等会儿会将其清理掉,现在还不是时候)。
  • 5、全部都是活跃连接,5 分钟后再进行清理。
  • 6、没有任何连接,跳出循环。

RealConnection 内有一个 SteamAllocation 虚引用列表,每次创建的 StreamAllocation,都会被添加到这个列表中,如果流关闭后就将 SteamAllocation 对象从该列表中移出去,也正是利用这种计数方式判定一个连接是否为空闲连接。

查询引用计数是在 pruneAndGetAllocationCount 方法中实现。

private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    // 虚引用列表
    List<Reference<StreamAllocation>> references = connection.allocations;
    // 遍历虚引用列表
    for (int i = 0; i < references.size(); ) {
        Reference<StreamAllocation> reference = references.get(i);
        //如果虚引用StreamAllocation正在被使用,则跳过进行下一次循环
        if (reference.get() != null) {
            i++;
            continue;
        }

        // We've discovered a leaked allocation. This is an application bug.
        StreamAllocation.StreamAllocationReference streamAllocRef =
            (StreamAllocation.StreamAllocationReference) reference;
        String message = "A connection to " + connection.route().address().url()
            + " was leaked. Did you forget to close a response body?";
        Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

        // 移除引用
        references.remove(i);
        connection.noNewStreams = true;

        // If this was the last allocation, the connection is eligible for immediate eviction.
        if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
        }
    }

    return references.size();
}

参考

[juejin.cn/post/684490…](