先从Okhttp3的请求实现说起(包括同步请求和异步请求)
基本使用
同步请求:
private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url(DESTINATION_ADDRESS)
.build();
Response response = client.newCall(request).execute();
异步请求:
private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url(DESTINATION_ADDRESS)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
小结:
- 不管是同步请求还是异步请求,都需要初始化OkHttpClient以及创建一个Request,然后再调用OkHttpClinet的newCall方法,创建出一个RealCall对象
- 对于同步请求,是调用RealCall的execute方法;而异步请求,则是调用RealCall的enqueue方法实现
简介
- OkHttpCLient:在Okhttp库中,OkHttpClient处于一个中心者的地位,很多功能都需要通过它来转发或者实现的。在创建的时候,初始化了很多功能类,比如缓存,拦截器,网络连接池,分发器等类。 由于初始化比较复杂,OkHttpClient内部提供了Builder模式来创建,一般情况下,OkHttpClient是唯一的
- Request:是用来构建一个请求对象的,符合Http请求的标准,包含了请求头,方法等等属性,较为复杂,因此同样提供Builder模式构建。
- Response:是用来构建一个响应对象的,包含了响应头,响应码,数据等等属性,同样也提供Builder模式构建
RealCall
// 同步
client.newCall(request).execute()
// 异步
client.newCall(request).enqueue(Callback)
同步和异步请求,都是调用OkHttpClient的newCall方法创建一个RealCall对象,然后通过这个对象,执行请求的
@Override
public Call newCall(Request request) {
return new RealCall(this, request);
}
execute 同步请求
@Override
public Response execute() throws IOException {
// 方法检测,execute只能调用1次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
// 将RealCall添加到Dispatcher中的同步请求队列中
client.dispatcher().executed(this);
// 获取响应的数据
// Okhttp会对数据(请求流和响应流)进行拦截进行一些额外的处理,
// 比如失败重连,添加请求头,没网络优先返回缓存数据等等。
// 这一拦截过程,采用责任链模式来实现的,和Android中的事件传递机制差不多
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
// 请求完成或者取消,都会将RealCall从同步请求队列中移除
client.dispatcher().finished(this);
}
}
enqueue-异步请求
@Override
public void enqueue(Callback responseCallback) {
// 和execute一样,enqueue也只能调用1次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 执行请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall
AsyncCall–实际上是一个Runnable
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
NamedRunnable,继承Runnable的抽象类,根据传入的名称来修改线程名称,抽象出执行任务的execute方法
final class AsyncCall extends NamedRunnable {
// 其他省略,execute方法式真正执行请求
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
AsyncCall中的execute方法,是请求任务执行的方法,获取相应数据最终也是调用了getResponseWithInterceptorChain方法。
private Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!retryAndFollowUpInterceptor.isForWebSocket()) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(
retryAndFollowUpInterceptor.isForWebSocket()));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
Dispatcher
原理:
- 对于异步请求来说,Dispatcher类内部持有一个正在执行(运行中)异步任务的队列和一个等待执行(就绪)异步请求的队列,以及一个线程池
- 这个线程池,没有常存的核心线程,最多线程数为Integer.MAX_VALUE,线程空闲时存活时间为60秒,而SynchronousQueue #2 是不保存任务的,所以只要把任务添加进去就会执行
- OkHttp不是在线程池中维护线程的个数,线程是一直在Dispatcher中直接控制。线程池中的请求都是运行中的请求。这也就是说线程的重用不是线程池控制的,通过源码我们发现线程重用的地方是请求结束的地方finished(AsyncCall call) ,而真正的控制是通过promoteCalls方法, 根据maxRequests和maxRequestsPerHost来调整runningAsyncCalls和readyAsyncCalls,使运行中的异步请求不超过两种最大值,并且如果队列有空闲,将就绪状态的请求归类为运行中
public final class Dispatcher {
//最大请求数
private int maxRequests = 64;
//相同host最大请求数
private int maxRequestsPerHost = 5;
//请求执行,懒加载
private ExecutorService executorService;
//就绪状态的异步请求队列
private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
//运行中的异步请求队列
private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
//进行中的同步请求,包括那些尚未完成被取消的请求
private final Deque<Call> executedCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService getExecutorService() {
if (executorService == null) {
//corePoolSize 为 0表示,没有核心线程,所有执行请求的线程,使用完了如果过期了(keepAliveTime)就回收了,
//maximumPoolSize 无限大的线程池空间
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
synchronized void enqueue(AsyncCall call) {
if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningCalls.add(call);
getExecutorService().execute(call);
} else {
readyCalls.add(call);
}
}
//取消带有tag的所有请求
public synchronized void cancel(Object tag) {
for (AsyncCall call : readyCalls) {
if (Util.equal(tag, call.tag())) {
call.cancel();
}
}
for (AsyncCall call : runningCalls) {
if (Util.equal(tag, call.tag())) {
call.get().canceled = true;
HttpEngine engine = call.get().engine;
if (engine != null) engine.cancel();
}
}
for (Call call : executedCalls) {
if (Util.equal(tag, call.tag())) {
call.cancel();
}
}
}
//异步请求结束
//当该异步请求结束的时候,会调用此方法,
//用于将运行中的异步请求队列中的该请求移除并调整请求队列
//此时就绪队列中的请求就可以进入运行中的队列
synchronized void finished(AsyncCall call) {
if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
//根据maxRequests和maxRequestsPerHost来调整
//runningAsyncCalls和readyAsyncCalls
//使运行中的异步请求不超过两种最大值
//并且如果队列有空闲,将就绪状态的请求归类为运行中。
private void promoteCalls() {
//运行中的异步请求队列的请求数大于最大请求数,那么就没必要去将就绪状态的请求移动到运行中。
//其实就是说,如果有超过最大请求数的请求正在运行,是不需要将其移出队列的,继续运行完即可
if (runningCalls.size() >= maxRequests) return;
//如果就绪的队列为空,那就更没有必要移动了,因为都没有。
if (readyCalls.isEmpty()) return;
//遍历就绪队列
for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
//取出一个请求
AsyncCall call = i.next();
//如果当前请求对应的host下,没有超过maxRequestsPerHost
//那么将其从就绪队列中移除,并加入在运行队列。
if (runningCallsForHost(call) < maxRequestsPerHost) {
//移除
i.remove();
//加入运行队列
runningCalls.add(call);
//立即执行该请求
getExecutorService().execute(call);
}
//如果运行队列已经到达了最大请求数上限
//此时如果还有就绪中的请求,也不管了
if (runningCalls.size() >= maxRequests) return;
}
}
//对比已有的运行中的请求和当前请求的host
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
synchronized void executed(Call call) {
executedCalls.add(call);
}
//同步请求结束
//当该同步请求结束的时候,会调用此方法,
//用于将运行中的同步请求队列中的该请求移除
synchronized void finished(Call call) {
if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
}
public synchronized int getRunningCallCount() {
return runningCalls.size();
}
public synchronized int getQueuedCallCount() {
return readyCalls.size();
}
}