/** * 添加拦截器 * 通过RealInterceptorChain#proceed调用Interceptor集合中某个index拦截器的interceptor方法, * 此方法内部会再new一个RealInterceptorChain对象,index++,再再调用其proceed方法, * 形成递归调用。 * @return * @throws IOException */ Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. //TODO 责任链 倒序调用 Listinterceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); //TODO 5、处理重试与重定向 interceptors.add(retryAndFollowUpInterceptor); //TODO 4、处理 配置请求头等信息 interceptors.add(new BridgeInterceptor(client.cookieJar())); //TODO 3、处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应 //TODO 设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改) interceptors.add(new CacheInterceptor(client.internalCache())); //TODO 2、连接服务器 interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } //TODO 1、执行流操作(写出请求体、获得响应数据) //TODO 进行http请求报文的封装与请求报文的解析 interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0, originalRequest, this, eventListener, client.connectTimeoutMillis(), client.readTimeoutMillis(), client.writeTimeoutMillis()); return chain.proceed(originalRequest); }复制代码
// RealInterceptorChain.java public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } //创建新的拦截链,链中的拦截器集合index+1 // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout); //执行当前的拦截器 默认是:retryAndFollowUpInterceptor Interceptor interceptor = interceptors.get(index); //intercept方法中,又会调用chain.proceed() Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } if (response.body() == null) { throw new IllegalStateException( "interceptor " + interceptor + " returned a response with no body"); } return response; }复制代码
// Ready async calls in the order they'll be run. private final DequereadyAsyncCalls = new ArrayDeque<>(); // Running asynchronous calls. Includes canceled calls that haven't finished yet. private final Deque runningAsyncCalls = new ArrayDeque<>(); // Running synchronous calls. Includes canceled calls that haven't finished yet. private final Deque runningSyncCalls = new ArrayDeque<>();复制代码
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } catch (IOException e) { eventListener.callFailed(this, e); throw e; } finally { client.dispatcher().finished(this); } }复制代码
//RealCall.java @Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }复制代码
//Dispatcher.java private int maxRequests = 64; private int maxRequestsPerHost = 5; synchronized void enqueue(AsyncCall call) { //如果正在执行的请求小于设定值即64,并且请求同一个主机的request小于设定值即5 if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { //添加到执行队列,开始执行请求 runningAsyncCalls.add(call); //获得当前线程池,没有则创建一个 executorService().execute(call); } else { //添加到等待队列中 readyAsyncCalls.add(call); } }复制代码
@Override protected void execute() { boolean signalledCallback = false; try { //TODO 责任链模式 //TODO 拦截器链 执行请求 Response response = getResponseWithInterceptorChain(); //回调结果 if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { //TODO 移除队列 client.dispatcher().finished(this); } } }复制代码
private void promoteCalls() { //TODO 检查 运行队列 与 等待队列 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iteratori = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); //TODO 相同host的请求没有达到最大 if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }复制代码