Android 网络请求OkHttp3流程分析详解手机开发

基本概念

首先从使用出发,其次再结合源码来分析OkHttp3的内部实现的,建议大家下载 OkHttp 源码跟着本文,过一遍源码。首先来看一下OkHttp3的请求代码。

OkHttpClient client = new OkHttpClient(); 
 
String run(String url) throws IOException { 
  Request request = new Request.Builder() 
      .url(url) 
      .build(); 
 
  Response response = client.newCall(request).execute(); 
  return response.body().string(); 
}

OkHttp3的执行流程

  1. 创建OkHttpClient对象。OkHttpClient为网络请求执行的一个中心,它会管理连接池,缓存,SocketFactory,代理,各种超时时间,DNS,请求执行结果的分发等许多内容。
  2. 创建Request对象。Request用于描述一个HTTP请求,比如请求的方法是GET还是POST,请求的URL,请求的header,请求的body,请求的缓存策略等。
  3. 创建Call对象。Call是一次HTTP请求的Task,它会执行网络请求以获得响应。OkHttp中的网络请求执行Call既可以同步进行,也可以异步进行。调用call.execute()将直接执行网络请求,阻塞直到获得响应。而调用call.enqueue()传入回调,则会将Call放入一个异步执行队列,由ExecutorService在后台执行。
  4. 执行网络请求并获取响应。

上面的代码中涉及到几个常用的类:Request、Response和Call。下面就这几个类做详细的介绍。

Request

每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头,请求还可能包含一个特定内容类型的数据类的主体部分。

Response

响应是对请求的回复,包含状态码、HTTP头和主体部分。

Call

OkHttp使用Call抽象出一个满足请求的模型,尽管中间可能会有多个请求或响应。执行Call有两种方式,同步或异步。

那么首先来看一下OkHttpClient的源码实现。

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { 
  public OkHttpClient() { 
       this(new Builder()); 
  } 
  OkHttpClient(Builder builder) { 
    this.dispatcher = builder.dispatcher; 
    this.proxy = builder.proxy; 
    this.protocols = builder.protocols; 
    this.connectionSpecs = builder.connectionSpecs; 
    this.interceptors = Util.immutableList(builder.interceptors); 
    this.networkInterceptors = Util.immutableList(builder.networkInterceptors); 
    this.eventListenerFactory = builder.eventListenerFactory; 
    this.proxySelector = builder.proxySelector; 
    this.cookieJar = builder.cookieJar; 
    this.cache = builder.cache; 
    this.internalCache = builder.internalCache; 
    this.socketFactory = builder.socketFactory; 
 
    boolean isTLS = false; 
 
    this.hostnameVerifier = builder.hostnameVerifier; 
    this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner( 
        certificateChainCleaner); 
    this.proxyAuthenticator = builder.proxyAuthenticator; 
    this.authenticator = builder.authenticator; 
    this.connectionPool = builder.connectionPool; 
    this.dns = builder.dns; 
    this.followSslRedirects = builder.followSslRedirects; 
    this.followRedirects = builder.followRedirects; 
    this.retryOnConnectionFailure = builder.retryOnConnectionFailure; 
    this.connectTimeout = builder.connectTimeout; 
    this.readTimeout = builder.readTimeout; 
    this.writeTimeout = builder.writeTimeout; 
    this.pingInterval = builder.pingInterval; 
  } 
}

然后使用okHttpClient发起请求。例如:

okHttpClient.newCall(request).enqueue(new Callback() { 
  @Override 
  public void onFailure(Call call, IOException e) { 
 
 } 
 
@Override 
public void onResponse(Call call, Response response) throws IOException { 
 
} 
});

那接下来我们在看下Request。例如:

Request request = new Request.Builder().url("url").build();

该段代码主要实现初始化构建者模式和请求对象,并且用URL替换Web套接字URL。其源码如下:

public final class Request { 
    public Builder() { 
      this.method = "GET"; 
      this.headers = new Headers.Builder(); 
    } 
    public Builder url(String url) { 
      ...... 
 
      // Silently replace web socket URLs with HTTP URLs. 
      if (url.regionMatches(true, 0, "ws:", 0, 3)) { 
        url = "http:" + url.substring(3); 
      } else if (url.regionMatches(true, 0, "wss:", 0, 4)) { 
        url = "https:" + url.substring(4); 
      } 
 
      HttpUrl parsed = HttpUrl.parse(url); 
      ...... 
      return url(parsed); 
    } 
    public Request build() { 
      ...... 
      return new Request(this); 
    } 
}

我们来看一下okHttpClient的异步请求方式。

okHttpClient.newCall(request).enqueue(new Callback() { 
  @Override 
  public void onFailure(Call call, IOException e) { 
 
 } 
 
@Override 
public void onResponse(Call call, Response response) throws IOException { 
 
} 
});

而newCall又调用了RealCall函数,来看源码:

public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory { 
   @Override  
   public Call newCall(Request request) { 
    return new RealCall(this, request, false 
     /* for web socket */); 
   } 
}

RealCall实现了Call.Factory接口创建了一个RealCall的实例,而RealCall是Call接口的实现。继续看代码:

final class RealCall implements Call { 
   @Override  
   public void enqueue(Callback responseCallback) { 
   synchronized (this) { 
   if (executed) throw new IllegalStateException("Already Executed"); 
      executed = true; 
   } 
    captureCallStackTrace(); 
    client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback)); 
  } 
}

由上面的代码可以得出:

  • 检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone方法进行克隆。
  • 利用 client.dispatcher().enqueue(this) 来进行实际执行,dispatcher 是刚才看到的OkHttpClient.Builder 的成员之一。
  • AsyncCall是RealCall的一个内部类并且继承NamedRunnable。
final class AsyncCall extends NamedRunnable { 
        private final Callback responseCallback; 
 
        AsyncCall(Callback responseCallback) { 
            super("OkHttp %s", new Object[]{RealCall.this.redactedUrl()}); 
            this.responseCallback = responseCallback; 
        } 
        ... 
}

而NamedRunnable又实现了Runnable接口,来看代码:

public abstract class NamedRunnable implements Runnable { 
  ...... 
 
  @Override  
  public final void run() { 
   ...... 
    try { 
      execute(); 
    } 
    ...... 
  } 
 
  protected abstract void execute(); 
}

可以看到NamedRunnable实现了Runnbale接口并且是个抽象类,其抽象方法是execute(),该方法是在run方法中被调用的,这也就意味着NamedRunnable是一个任务,并且其子类应该实现execute方法。下面再看AsyncCall的实现:

final class AsyncCall extends NamedRunnable { 
    private final Callback responseCallback; 
 
    AsyncCall(Callback responseCallback) { 
      super("OkHttp %s", redactedUrl()); 
      this.responseCallback = responseCallback; 
    } 
 
    ...... 
final class RealCall implements Call { 
  @Override protected void execute() { 
  boolean signalledCallback = false; 
  try { 
     Response response = getResponseWithInterceptorChain(); 
  if (retryAndFollowUpInterceptor.isCanceled()) { 
     signalledCallback = true; 
     responseCallback.onFailure(RealCall.this, new IOException("Canceled")); 
  } else { 
    signalledCallback = true; 
    responseCallback.onResponse(RealCall.this, response); 
  } 
 } catch (IOException e) { 
  ...... 
  responseCallback.onFailure(RealCall.this, e); 
 
} finally { 
    client.dispatcher().finished(this); 
  } 
}

AsyncCall实现了execute方法,首先是调用getResponseWithInterceptorChain()方法获取响应,然后获取成功后,就调用回调的onReponse方法,如果失败,就调用回调的onFailure方法,并调用Dispatcher的finished方法。

Dispatcher线程池介绍

那还看一下Dispatcher类的相关代码:

public final class Dispatcher { 
  /** 最大并发请求数为64 */ 
  private int maxRequests = 64; 
  /** 每个主机最大请求数为5 */ 
  private int maxRequestsPerHost = 5; 
 
  /** 线程池 */ 
  private ExecutorService executorService; 
 
  /** 准备执行的请求 */ 
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); 
 
  /** 正在执行的异步请求,包含已经取消但未执行完的请求 */ 
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); 
 
  /** 正在执行的同步请求,包含已经取消单未执行完的请求 */ 
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); 
  }

在OkHttp,使用如下构造了单例线程池,相关源码如下:

public synchronized ExecutorService executorService() { 
    if (executorService == null) { 
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, 
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); 
    } 
    return executorService; 
  }

executorService函数会构造一个线程池ExecutorService:

executorService = new ThreadPoolExecutor( 
//corePoolSize 最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁 
    0,  
//maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理 
    Integer.MAX_VALUE,  
//keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间 
    60,  
//单位秒 
    TimeUnit.SECONDS, 
//工作队列,先进先出 
    new SynchronousQueue<Runnable>(),    
//单个线程的工厂          
   Util.threadFactory("OkHttp Dispatcher", false));

可以看出,在Okhttp中,构建了一个核心为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做”OkHttp Dispatcher”的线程工厂。也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

synchronized void enqueue(AsyncCall call) { 
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { 
      runningAsyncCalls.add(call); 
      executorService().execute(call); 
    } else { 
      readyAsyncCalls.add(call); 
    } 
  }

从上述源码分析,如果当前还能执行一个并发请求,则加入 runningAsyncCalls ,立即执行,否则加入 readyAsyncCalls 队列。由此,可以得出Dispatcher的以下作用。

  • 调度线程池Disptcher实现了高并发,低阻塞的实现;
  • 采用Deque作为缓存,先进先出的顺序执行;
  • 任务在try/finally中调用了finished函数,控制任务队列的执行顺序,而不是采用锁,减少了编码复杂性提高性能。
try { 
        Response response = getResponseWithInterceptorChain(); 
        if (retryAndFollowUpInterceptor.isCanceled()) { 
          signalledCallback = true; 
          responseCallback.onFailure(RealCall.this, new IOException("Canceled")); 
        } else { 
          signalledCallback = true; 
          responseCallback.onResponse(RealCall.this, response); 
        } 
      } finally { 
        client.dispatcher().finished(this); 
      }

其流程可以用下图表示:
这里写图片描述

getResponseWithInterceptorChain方法

相关的方法源码如下:

Response getResponseWithInterceptorChain() throws IOException { 
    // Build a full stack of interceptors. 
    List<Interceptor> interceptors = new ArrayList<>(); 
    interceptors.addAll(client.interceptors()); 
    interceptors.add(retryAndFollowUpInterceptor); 
    interceptors.add(new BridgeInterceptor(client.cookieJar())); 
    interceptors.add(new CacheInterceptor(client.internalCache())); 
    interceptors.add(new ConnectInterceptor(client)); 
    if (!forWebSocket) { 
      interceptors.addAll(client.networkInterceptors()); 
    } 
    interceptors.add(new CallServerInterceptor(forWebSocket)); 
 
    Interceptor.Chain chain = new RealInterceptorChain( 
        interceptors, null, null, null, 0, originalRequest); 
    return chain.proceed(originalRequest); 
  }

从上述源码得知,不管okhttp有多少拦截器最后都会走,如下方法:

Interceptor.Chain chain = new RealInterceptorChain( 
        interceptors, null, null, null, 0, originalRequest); 
return chain.proceed(originalRequest);

从方法名字基本可以猜到是干嘛的,调用 chain.proceed(originalRequest); 将request传递进来,从拦截器链里拿到返回结果。那么看一下RealInterceptorChain类。

public final class RealInterceptorChain implements Interceptor.Chain { 
 
   public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation, 
        HttpCodec httpCodec, RealConnection connection, int index, Request request) { 
        this.interceptors = interceptors; 
        this.connection = connection; 
        this.streamAllocation = streamAllocation; 
        this.httpCodec = httpCodec; 
        this.index = index; 
        this.request = request; 
  } 
  ...... 
 
 @Override  
 public Response proceed(Request request) throws IOException { 
    return proceed(request, streamAllocation, httpCodec, connection); 
  } 
 
  public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, 
      RealConnection connection) throws IOException { 
    if (index >= interceptors.size()) throw new AssertionError(); 
 
    calls++; 
 
    ...... 
 
    // Call the next interceptor in the chain. 
    RealInterceptorChain next = new RealInterceptorChain( 
        interceptors, streamAllocation, httpCodec, connection, index + 1, request); 
    Interceptor interceptor = interceptors.get(index); 
    Response response = interceptor.intercept(next); 
 
   ...... 
 
    return response; 
  } 
 
  protected abstract void execute(); 
}

该类实现了Chain接口,在getResponseWithInterceptorChain调用时好几个参数都传的null。主要看proceed方法,proceed方法中判断index(此时为0)是否大于或者等于client.interceptors(List )的大小。由于httpStream为null,所以首先创建next拦截器链,主需要把索引置为index+1即可;然后获取第一个拦截器,调用其intercept方法。Interceptor 代码如下:

public interface Interceptor { 
  Response intercept(Chain chain) throws IOException; 
 
  interface Chain { 
    Request request(); 
 
    Response proceed(Request request) throws IOException; 
 
    Connection connection(); 
  } 
}

BridgeInterceptor从用户的请求构建网络请求,然后提交给网络,最后从网络响应中提取出用户响应。从最上面的图可以看出,BridgeInterceptor实现了适配的功能。下面是其intercept方法:

public final class BridgeInterceptor implements Interceptor { 
  ...... 
 
@Override  
public Response intercept(Chain chain) throws IOException { 
  Request userRequest = chain.request(); 
  Request.Builder requestBuilder = userRequest.newBuilder(); 
 
 RequestBody body = userRequest.body(); 
 //如果存在请求主体部分,那么需要添加Content-Type、Content-Length首部 
 if (body != null) { 
      MediaType contentType = body.contentType(); 
      if (contentType != null) { 
        requestBuilder.header("Content-Type", contentType.toString()); 
      } 
 
      long contentLength = body.contentLength(); 
      if (contentLength != -1) { 
        requestBuilder.header("Content-Length", Long.toString(contentLength)); 
        requestBuilder.removeHeader("Transfer-Encoding"); 
      } else { 
        requestBuilder.header("Transfer-Encoding", "chunked"); 
        requestBuilder.removeHeader("Content-Length"); 
      } 
    } 
 
    if (userRequest.header("Host") == null) { 
      requestBuilder.header("Host", hostHeader(userRequest.url(), false)); 
    } 
 
    if (userRequest.header("Connection") == null) { 
      requestBuilder.header("Connection", "Keep-Alive"); 
    } 
 
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing 
    // the transfer stream. 
    boolean transparentGzip = false; 
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { 
      transparentGzip = true; 
      requestBuilder.header("Accept-Encoding", "gzip"); 
    } 
 
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url()); 
    if (!cookies.isEmpty()) { 
      requestBuilder.header("Cookie", cookieHeader(cookies)); 
    } 
 
  if (userRequest.header("User-Agent") == null) { 
      requestBuilder.header("User-Agent", Version.userAgent()); 
  } 
 
Response networkResponse = chain.proceed(requestBuilder.build()); 
 
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers()); 
 
Response.Builder responseBuilder = networkResponse.newBuilder() 
        .request(userRequest); 
 
    if (transparentGzip 
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding")) 
        && HttpHeaders.hasBody(networkResponse)) { 
      GzipSource responseBody = new GzipSource(networkResponse.body().source()); 
      Headers strippedHeaders = networkResponse.headers().newBuilder() 
          .removeAll("Content-Encoding") 
          .removeAll("Content-Length") 
          .build(); 
      responseBuilder.headers(strippedHeaders); 
      responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody))); 
    } 
 
    return responseBuilder.build(); 
  } 
 
  /** Returns a 'Cookie' HTTP request header with all cookies, like [email protected] a=b; c=d}. */ 
  private String cookieHeader(List<Cookie> cookies) { 
    StringBuilder cookieHeader = new StringBuilder(); 
    for (int i = 0, size = cookies.size(); i < size; i++) { 
      if (i > 0) { 
        cookieHeader.append("; "); 
      } 
      Cookie cookie = cookies.get(i); 
      cookieHeader.append(cookie.name()).append('=').append(cookie.value()); 
    } 
    return cookieHeader.toString(); 
  } 
}

从上面的代码可以看出,首先获取原请求,然后在请求中添加头,比如Host、Connection、Accept-Encoding参数等,然后根据看是否需要填充Cookie,在对原始请求做出处理后,使用chain的procced方法得到响应,接下来对响应做处理得到用户响应,最后返回响应。再看下一个拦截器ConnectInterceptor的处理:

public final class ConnectInterceptor implements Interceptor { 
  ...... 
 
 @Override  
 public Response intercept(Chain chain) throws IOException { 
 RealInterceptorChain realChain = (RealInterceptorChain) chain; 
Request request = realChain.request(); 
StreamAllocation streamAllocation = realChain.streamAllocation(); 
 
 // We need the network to satisfy this request. Possibly for validating a conditional GET. 
 boolean doExtensiveHealthChecks = !request.method().equals("GET"); 
 HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); 
 RealConnection connection = streamAllocation.connection(); 
 
 return realChain.proceed(request, streamAllocation, httpCodec, connection); 
  } 
}

实际上建立连接就是创建了一个 HttpCodec 对象,它利用 Okio 对 Socket 的读写操作进行封装,Okio 以后有机会再进行分析,现在让我们对它们保持一个简单地认识:它对 java.io 和 java.nio 进行了封装,让我们更便捷高效的进行 IO 操作。

CallServerInterceptor

CallServerInterceptor是拦截器链中最后一个拦截器,负责将网络请求提交给服务器。

@Override  
public Response intercept(Chain chain) throws IOException { 
    RealInterceptorChain realChain = (RealInterceptorChain) chain; 
    HttpCodec httpCodec = realChain.httpStream(); 
    StreamAllocation streamAllocation = realChain.streamAllocation(); 
    RealConnection connection = (RealConnection) realChain.connection(); 
    Request request = realChain.request(); 
 
    long sentRequestMillis = System.currentTimeMillis(); 
    httpCodec.writeRequestHeaders(request); 
 
    Response.Builder responseBuilder = null; 
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) { 
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100 
      // Continue" response before transmitting the request body. If we don't get that, return what 
      // we did get (such as a 4xx response) without ever transmitting the request body. 
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) { 
        httpCodec.flushRequest(); 
        responseBuilder = httpCodec.readResponseHeaders(true); 
      } 
 
      if (responseBuilder == null) { 
        // Write the request body if the "Expect: 100-continue" expectation was met. 
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength()); 
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut); 
        request.body().writeTo(bufferedRequestBody); 
        bufferedRequestBody.close(); 
      } else if (!connection.isMultiplexed()) { 
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from 
        // being reused. Otherwise we're still obligated to transmit the request body to leave the 
        // connection in a consistent state. 
        streamAllocation.noNewStreams(); 
      } 
    } 
 
    httpCodec.finishRequest(); 
 
    if (responseBuilder == null) { 
      responseBuilder = httpCodec.readResponseHeaders(false); 
    } 
 
    Response response = responseBuilder 
        .request(request) 
        .handshake(streamAllocation.connection().handshake()) 
        .sentRequestAtMillis(sentRequestMillis) 
        .receivedResponseAtMillis(System.currentTimeMillis()) 
        .build(); 
 
    int code = response.code(); 
    if (forWebSocket && code == 101) { 
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body. 
      response = response.newBuilder() 
          .body(Util.EMPTY_RESPONSE) 
          .build(); 
    } else { 
      response = response.newBuilder() 
          .body(httpCodec.openResponseBody(response)) 
          .build(); 
    } 
 
    if ("close".equalsIgnoreCase(response.request().header("Connection")) 
        || "close".equalsIgnoreCase(response.header("Connection"))) { 
      streamAllocation.noNewStreams(); 
    } 
 
    if ((code == 204 || code == 205) && response.body().contentLength() > 0) { 
      throw new ProtocolException( 
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength()); 
    } 
 
    return response; 
  }

从上面的代码中可以看出,首先获取HttpStream对象,然后调用writeRequestHeaders方法写入请求的头部,然后判断是否需要写入请求的body部分,最后调用finishRequest()方法将所有数据刷新给底层的Socket,接下来尝试调用readResponseHeaders()方法读取响应的头部,然后再调用openResponseBody()方法得到响应的body部分,最后返回响应。

总结

最后我们用一张图来总结ohhttp的整个请求流程。
这里写图片描述

OkHttp的底层是通过Java的Socket发送HTTP请求与接受响应的(,但是OkHttp实现了连接池的概念,即对于同一主机的多个请求,其实可以公用一个Socket连接,而不是每次发送完HTTP请求就关闭底层的Socket,这样就实现了连接池的概念,而且OkHttp对Socket的读写操作使用的OkIo库进行了一层封装。

原创文章,作者:奋斗,如若转载,请注明出处:https://blog.ytso.com/5854.html

(0)
上一篇 2021年7月17日
下一篇 2021年7月17日

相关推荐

发表回复

登录后才能评论