【源码解析】OkHttp的工作原理

OkHttp 作为优秀的网络请求框架,已经得到了广大Android开发者的认可。对于它的使用方法,大家也是非常的熟悉。例如同步同步请求、异步请求等,都可以使用很简洁的逻辑来实现。由于 OkHttp 已经封装了繁琐复杂的请求逻辑,开发者只需要使用其提供的API就能轻松的实现网络请求,这使得开发者能将更多的精力放到业务开发上,提高了工作效率。

但是,作为一位有追求的Android开发者,不能一味的衣来伸手饭来张口。虽然不至于要自己动手,丰衣足食,但是我们也要做到知其然更知其所以然,从优秀的基础框架中学习其设计思想、架构思想,这对我们的成长也是有非常大的帮助的。

下面我们就以 OkHttp 为例,从源码的角度对其整体流程进行分析,以便能从中学习到其设计思路与工作原理。

整体架构

下面是OkHttp发起请求与数据响应的流程图(参考 拆轮子系列:拆 OkHttp 这篇文章画的)。

整体来说,是 OkHttpClient 通过 newCall 方法,进而触发 RealCall的execute (同步)、 enquene (异步)方法,经过一系列的 interceptors (拦截器),最后经过IO操作发起请求得到 Response (响应数据)。

下面针对上述 OkHttp 的整体工作流程,从源码的角度分析其中的原理,我们首先从同步请求说起。

同步请求

OkHttp 的同步请求示例如下所示。

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
    .url(url)
    .build();

Response response = client.newCall(request).execute();
return response.body().string();
复制代码

OkHttpClient 有个内部类 Builder ,它的 build 方法可以生成一个 OkHttpClient 实例。但 OkHttpClient 提供了一个构造函数,让我们可以使用默认的配置来创建 OkHttpClient 实例。

OkHttpClient 实例的 newCall 方法接收一个 Request 对象,该对象由 Request 的内部类 Builder 构造出来。

public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false);
}
复制代码

newCall 方法中通过 RealCall 又调用了 newRealCall 方法,并返回 RealCall 对象。也就是说,实际上执行的是 RealCallexecute 方法。

public Response execute() throws IOException {
    synchronized (this) { // 1
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
        client.dispatcher().executed(this);
        Response result = getResponseWithInterceptorChain(); // 2
        if (result == null) throw new IOException("Canceled");
        return result;
    } catch (IOException e) {
        e = timeoutExit(e);
        eventListener.callFailed(this, e);
        throw e;
    } finally {
        client.dispatcher().finished(this);
    }
}
复制代码

1、每一个 RealCall 对象的 execute 方法只能执行一次,多次执行会抛出 IllegalStateException 异常。

2、通过执行 getResponseWithInterceptorChain 方法同步返回了 Response 对象。从上面的整体架构可知, getResponseWithInterceptorChain 方法是OkHttp发起网络请求的重点部分,我们接着往下面看。

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor); 
    interceptors.add(new BridgeInterceptor(client.cookieJar())); 
    interceptors.add(new CacheInterceptor(client.internalCache())); 
    interceptors.add(new ConnectInterceptor(client)); 
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket)); 

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }
}
复制代码

getResponseWithInterceptorChain 方法中的往List中加了很多拦截器,最后通过构造了 RealInterceptorChain 对象,并执行它的 proceed 方法返回一个 Response 对象。该方法的工作流程如下图所示。

这就是所谓的责任链模式,每一个节点负责自己的一部分工作,最后组装成一个完整的请求对象发送网络请求并返回 Response 。对于其中的每一个节点(拦截器),我们通过源码进一步分析其中的细节。

RealInterceptorChain

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
    RealConnection connection) throws IOException {
  ...省略
  
  //生成下一个节点
  RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
      connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
      writeTimeout);
  Interceptor interceptor = interceptors.get(index);
  Response response = interceptor.intercept(next);

  ...省略

  return response;
}
复制代码

RealInterceptorChainproceed 方法主要做了两个操作:

1、生成了下一个节点对象 next ,类型为 RealInterceptorChain

2、调用了当前拦截器的 intercept 方法,并将 next 对象传递给该拦截器。从上述流程图中,我们认定当前拦截器是 RetryAndFollowUpInterceptor

RetryAndFollowUpInterceptor

public Response intercept(Chain chain) throws IOException {
  Request request = chain.request();
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Call call = realChain.call();
  EventListener eventListener = realChain.eventListener();

  StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
      createAddress(request.url()), call, eventListener, callStackTrace);
  this.streamAllocation = streamAllocation;

  int followUpCount = 0; //失败重试次数
  Response priorResponse = null;
  while (true) { //这里死循环的意思是会一直重试,直到遇到return或者抛出异常后才会跳出
    if (canceled) {
      streamAllocation.release();
      throw new IOException("Canceled");
    }

    Response response;
    boolean releaseConnection = true;
    try {
      response = realChain.proceed(request, streamAllocation, null, null);
      releaseConnection = false;
    } catch (RouteException e) {
      ...
      continue;
    } catch (IOException e) {
      ...
      continue;
    } finally {
      ...
    }

    ...

    Request followUp;
    try {
      followUp = followUpRequest(response, streamAllocation.route());
    } catch (IOException e) {
      streamAllocation.release();
      throw e;
    }

    if (followUp == null) { //不需要重试,直接返回Response对象
      streamAllocation.release();
      return response;
    }

    closeQuietly(response.body());

    if (++followUpCount > MAX_FOLLOW_UPS) { //重试次数加1,重试次数大于最大次数,释放资源
      streamAllocation.release();
      throw new ProtocolException("Too many follow-up requests: " + followUpCount);
    }

    //继续重试,直到重试次数大于最大重试次数
    request = followUp;
    priorResponse = response;
  }
}
复制代码

RetryAndFollowUpInterceptor ,从它的名字也能看出来,其主要目的是为失败重试和重定向而工作的。所以它的 intercept 方法主要利用了 while 循环进行多次的请求重试,只有当重试次数大于最大重试次数时才会跳出 while 循环。

BridgeInterceptor

RetryAndFollowUpInterceptorintercept 方法将沿着责任链,从而执行到了 BridgeInterceptorintercept 方法。

public Response intercept(Chain chain) throws IOException {
  Request userRequest = chain.request();
  Request.Builder requestBuilder = userRequest.newBuilder(); //构造了一个新的Request.Builder对象

  RequestBody body = userRequest.body();
  if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
      requestBuilder.header("Content-Type", contentType.toString());
    }

    long contentLength = body.contentLength();
    if (contentLength != -1) {
      requestBuilder.header("Content-Length", Long.toString(contentLength));
      requestBuilder.removeHeader("Transfer-Encoding");
    } else {
      requestBuilder.header("Transfer-Encoding", "chunked");
      requestBuilder.removeHeader("Content-Length");
    }
  }

  if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
  }

  if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
  }
  
  boolean transparentGzip = false;
  if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
  }

  List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
  if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
  }

  if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
  }

  Response networkResponse = chain.proceed(requestBuilder.build());

  HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

  Response.Builder responseBuilder = networkResponse.newBuilder()
      .request(userRequest);

  if (transparentGzip
      && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
      && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
        .removeAll("Content-Encoding")
        .removeAll("Content-Length")
        .build();
    responseBuilder.headers(strippedHeaders);
    String contentType = networkResponse.header("Content-Type");
    //将网络返回的数据构进一步的封装并返回
    responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
  }

  return responseBuilder.build();
}
复制代码

BridgeInterceptor 的拦截方法要做的事情并不多,主要目的是判断网络请求 Request 对象 header 对象的字段为空时,构造一个默认对象,而且在网络数据返回后,将返回的 Response 对象进一步的封装并返回。

CacheInterceptor

BridgeInterceptorintercept 方法又将网络请求继续分发,它的下一个拦截器则是 CacheInterceptor 。从 CacheInterceptor 的名字与其注释可知,该拦截器的主要功能是获取缓存的 Response 以及将网络请求返回 Response 存储到缓存中。

public Response intercept(Chain chain) throws IOException {
  Response cacheCandidate = cache != null
      ? cache.get(chain.request())
      : null;

  ...省略

  //没有网络,则使用缓存
  if (networkRequest == null) {
    return cacheResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .build();
  }
  Response networkResponse = null;
  try {
    networkResponse = chain.proceed(networkRequest);
  } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
      closeQuietly(cacheCandidate.body());
    }
  }

  // 有缓存,如果返回的Response和缓存比对后没有改变,则返回缓存
  if (cacheResponse != null) {
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
      Response response = cacheResponse.newBuilder()
          .headers(combine(cacheResponse.headers(), networkResponse.headers()))
          .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
          .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
      networkResponse.body().close();

      cache.trackConditionalCacheHit();
      cache.update(cacheResponse, response);
      return response;
    } else {
      closeQuietly(cacheResponse.body());
    }
  }

  Response response = networkResponse.newBuilder()
      .cacheResponse(stripBody(cacheResponse))
      .networkResponse(stripBody(networkResponse))
      .build();

  if (cache != null) { //缓存改变了,则重新写入新的Response到缓存
    if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
      CacheRequest cacheRequest = cache.put(response);
      return cacheWritingResponse(cacheRequest, response);
    }

    if (HttpMethod.invalidatesCache(networkRequest.method())) {
      try {
        cache.remove(networkRequest);
      } catch (IOException ignored) {
        // The cache cannot be written.
      }
    }
  }

  return response;
}
复制代码

从上面的 interceptor 方法中可以看出,该方法主要做了以下事情。

1、判断是否需要返回缓存。(一般无网络时会返回缓存)

2、有网络的情况下, Request 请求正常发送,判断返回的 Response 内容是否有更新。没有更新,则返回缓存内容;有更新,则返回新的 Response ,并将新的 Response 内容写入到缓存中。

ConnectInterceptor

CacheInterceptor 拦截器的下一个责任链节点是 ConnectInterceptor 拦截器。

public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  Request request = realChain.request();
  StreamAllocation streamAllocation = realChain.streamAllocation();

  // We need the network to satisfy this request. Possibly for validating a conditional GET.
  boolean doExtensiveHealthChecks = !request.method().equals("GET");
  HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
  RealConnection connection = streamAllocation.connection();

  return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
复制代码

ConnectInterceptor 拦截器的 intercept 方法主要负责建立连接,也就是创建了 HttpCodec 对象。 HttpCodec 是一个接口类,有两个实现类,分别为 Http1CodecHttp2CodecHttp1Codec 使用的是 Http1.0 版本的协议发送网络请求,而 Http2Codec 使用的是 Http2.0 版本的协议发送网络请求。从 HttpCodec 接口方法和其实现逻辑来看,其中主要封装了Java的 IO 操作,通过 stream 字节流参与网络请求的发送与接收过程。

CallServerInterceptor

public Response intercept(Chain chain) throws IOException {
  RealInterceptorChain realChain = (RealInterceptorChain) chain;
  HttpCodec httpCodec = realChain.httpStream();
  StreamAllocation streamAllocation = realChain.streamAllocation();
  RealConnection connection = (RealConnection) realChain.connection();
  Request request = realChain.request();

  long sentRequestMillis = System.currentTimeMillis();

  realChain.eventListener().requestHeadersStart(realChain.call());
  httpCodec.writeRequestHeaders(request);
  realChain.eventListener().requestHeadersEnd(realChain.call(), request);

  Response.Builder responseBuilder = null;
  if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
    if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
      httpCodec.flushRequest();
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(true);
    }

    if (responseBuilder == null) {
      realChain.eventListener().requestBodyStart(realChain.call());
      long contentLength = request.body().contentLength();
      CountingSink requestBodyOut =
          new CountingSink(httpCodec.createRequestBody(request, contentLength));
      BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

      request.body().writeTo(bufferedRequestBody);
      bufferedRequestBody.close();
      realChain.eventListener()
          .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
    } else if (!connection.isMultiplexed()) {
      streamAllocation.noNewStreams();
    }
  }

  httpCodec.finishRequest();

  if (responseBuilder == null) {
    realChain.eventListener().responseHeadersStart(realChain.call());
    responseBuilder = httpCodec.readResponseHeaders(false);
  }

  Response response = responseBuilder
      .request(request)
      .handshake(streamAllocation.connection().handshake())
      .sentRequestAtMillis(sentRequestMillis)
      .receivedResponseAtMillis(System.currentTimeMillis())
      .build();

  ...省略
  return response;
}
复制代码

CallServerInterceptor 作为最后一个拦截器,其主要利用了 HttpCodecreadResponseHeaders 方法获取 Response 数据,并将 Response 返回。

至此我们分析完了 OkHttp 中关于同步强求的整体流程。其中特别重要的是 OkHttp 中的拦截器分层原理,也就是所谓的责任链设计模式。 OkHttp 的请求会将经过拦截器一层层的分发,直到有拦截器将 Response 进行返回。而返回的 Response 也会传递到之前的每一个拦截器,每一个拦截器对该 Response 进行加工封装,最后形成一个统一的 Response 对象返回。

异步请求

OkHttp 的异步请求示例如下所示。

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
    .url(url)
    .build();

okHttpClient.newCall(request).enqueue(new Callback() {
    @Override
    public void onFailure(Call call, IOException e) {
        //请求失败的回调
    }

    @Override
    public void onResponse(Call call, Response response) throws IOException {
        //请求成功的回调
    }
});
复制代码

异步请求调用了 okHttpClient.newCall 方法,从上面的同步请求分析可以知道, okHttpClient.newCall 返回一个 RealCall 对象,也就是说异步请求其实调用的是 RealCall的enqueue 方法。

public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  eventListener.callStart(this);
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
复制代码

client.dispatcher 返回一个 Dispatcher 对象。该对象的功能如其名字一样,它会将请求进行管理并分发,其中的 enqueue 方法如下所示。

void enqueue(AsyncCall call) {
  synchronized (this) {
    readyAsyncCalls.add(call);
  }
  promoteAndExecute();
}
复制代码

enqueue 方法将AsyncCall对象加入了 readyAsyncCalls 队列,然后执行 promoteAndExecute 方法。

private boolean promoteAndExecute() {
  assert (!Thread.holdsLock(this));

  List<AsyncCall> executableCalls = new ArrayList<>();
  boolean isRunning;
  synchronized (this) {
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall asyncCall = i.next();

      if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
      if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.

      i.remove();
      executableCalls.add(asyncCall);
      runningAsyncCalls.add(asyncCall);
    }
    isRunning = runningCallsCount() > 0;
  }

  for (int i = 0, size = executableCalls.size(); i < size; i++) {
    AsyncCall asyncCall = executableCalls.get(i);
    asyncCall.executeOn(executorService());
  }

  return isRunning;
}
复制代码

promoteAndExecute 方法执行时,会比较 readyAsyncCalls 队列中的请求对象个数是否大于 maxRequests 的值,如果 readyAsyncCalls 队列的请求对象个数小于 maxRequests ,则将这些请求对象加入到 executableCalls 列表中,然后遍历每一个 AsyncCall 对象,执行它的 executeOn 方法。

void executeOn(ExecutorService executorService) {
  assert (!Thread.holdsLock(client.dispatcher()));
  boolean success = false;
  try {
    executorService.execute(this);
    success = true;
  } catch (RejectedExecutionException e) {
    InterruptedIOException ioException = new InterruptedIOException("executor rejected");
    ioException.initCause(e);
    eventListener.callFailed(RealCall.this, ioException);
    responseCallback.onFailure(RealCall.this, ioException);
  } finally {
    if (!success) {
      client.dispatcher().finished(this); // This call is no longer running!
    }
  }
}
复制代码

executeOn 方法执行的逻辑是:通过 ExecutorService (线程池)执行每一个 AsyncCall 请求对象,所以相应的 AsyncCall 对象的 run 方法会被执行,而 run 方法调用了 execute 方法。

protected void execute() {
    boolean signalledCallback = false;
    timeout.enter();
    try {
      Response response = getResponseWithInterceptorChain();
      if (retryAndFollowUpInterceptor.isCanceled()) {
        signalledCallback = true;
        responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
      } else {
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      }
    } catch (IOException e) {
      e = timeoutExit(e);
      if (signalledCallback) {
        // Do not signal the callback twice!
        Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
      } else {
        eventListener.callFailed(RealCall.this, e);
        responseCallback.onFailure(RealCall.this, e);
      }
    } finally {
      client.dispatcher().finished(this);
    }
  }
}
复制代码

execute 方法中,通过 getResponseWithInterceptorChain 方法返回 Response 对象。这里的 getResponseWithInterceptorChain 方法执行过程在同步请求时已经分析完了,这里不再重复说明。

至此,异步请求流程也已经分析完毕。和同步请求流程相对比,异步请求流程比同步流程多了一步也就是将请求对象进行分发并放到线程池中去执行,至于拦截器分层、发起网络请求、数据返回流程等都是一样的。

总结

OkHttp作为一个优秀的网络请求库,其主要运用责任链模式、分层思想、单一职责思想等设计模式与思想,通过每一层拦截器对请求Request和返回Response进行封装。隐藏了繁琐复杂的网络请求流程,提供简洁的API供开发者调用。通过源码解析,我们了解并学习了其中的设计原理与整体流程,做到了知其然也知其所以然。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章