首頁 > 軟體

Java OkHttp框架原始碼深入解析

2022-08-26 14:03:36

1.OkHttp發起網路請求

可以通過OkHttpClient發起一個網路請求

//建立一個Client,相當於開啟一個瀏覽器
 OkHttpClient okHttpClient = new OkHttpClient.Builder().build();
 //建立一個請求。
        Request request = new Request.Builder()
                .url("http://www.baidu.com")
                .method("GET",null)
                .build();
    //呼叫Client 建立一個Call。
        Call call = okHttpClient.newCall(request);
        //Call傳入一個回撥函數,並加入到請求佇列。
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
            }
        });
}

通過Retrofit發起一個OkHttp請求

 Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://www.baidu.com/")
                .build();
        NetInterface netInterface = retrofit.create(NetInterface.class);
        Call<Person> call = netInterface.getPerson();
        call.enqueue(new Callback<Person>() {
            @Override
            public void onResponse(Call<Person> call, Response<Person> response) {
            }
            @Override
            public void onFailure(Call<Person> call, Throwable t) {
            }
 });

以上兩種方式都是通過call.enqueue() 把網路請求加入到請求佇列的。

這個call是RealCall的一個物件。

 public void enqueue(Callback responseCallback) {
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

這裡有兩個判斷條件

runningAsyncCalls.size() < maxRequests如果執行佇列數量大於最大數量,

runningCallsForHost(call) < maxRequestsPerHost並且存取同一臺伺服器的請求數量大於最大數量,請求會放入等待佇列,否則加入執行佇列,直接執行。

//等待佇列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//執行佇列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
//執行佇列數量最大值
private int maxRequests = 64;
//存取不同主機的最大數量
private int maxRequestsPerHost = 5;
dispatcher.java
 synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

接下來看這行程式碼executorService().execute(call);

executorService()拿到一個執行緒池範例,

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;

execute(call)執行任務,發起網路請求。

 //AsyncCall.java
 @Override protected void execute() {
       try {
       //這個方法去請求網路,會返回Respose
        Response response = getResponseWithInterceptorChain();
        //請求成功,回撥介面
         responseCallback.onResponse(RealCall.this, response);
       }catch(Exceptrion e){
            //失敗回撥
         responseCallback.onFailure(RealCall.this, e);
       }finally {
          //從當前執行佇列中刪除這個請求
          client.dispatcher().finished(this);
      }
 }

getResponseWithInterceptorChain()

這行程式碼,使用了設計模式中的責任鏈模式。

 //這個方法命名:通過攔截器鏈,獲取Response
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
     // 這個我們自己定義的攔截器。
    interceptors.addAll(client.interceptors());
    //重試和重定向攔截器
    interceptors.add(retryAndFollowUpInterceptor);
    //請求頭攔截器
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //快取攔截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //連線攔截器
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //存取攔截器
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //攔截器責任鏈
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    //執行攔截器集合中的攔截器
    return chain.proceed(originalRequest);
  }

責任鏈模式中,鏈條的上游持有下游物件的參照。這樣能夠保證在鏈條上的每一個物件,都能對其符合條件的任務進行處理。

但是在上面的攔截器構成責任鏈中,是把攔截器,放在了一個集合中。

第一個引數interceptors 是一個攔截器的集合。

第五個引數0是集合的index,RealInterceptorChain就是根據這個索引值+1,

對chain.proceed方法迴圈呼叫,進行集合遍歷,並執行攔截器中定義的方法的。

這個責任鏈模式,並沒有明確的指定下游物件是什麼,而是通過集合index值的變化,動態的指定的。

 Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0......)
   chain.proceed(originalRequest);
   public Response proceed(Request request,...){
    //構建一個index+1的攔截器鏈
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
                connection, index + 1,....);
        //拿到當前的攔截器
        Interceptor interceptor = interceptors.get(index);
        //呼叫攔截器intercept(next)方法,
        //在這個方法中繼續呼叫realChain.proceed(),從而進行迴圈呼叫,index索引值再加1.
        Response response = interceptor.intercept(next);
 }

2.OkHttp的聯結器

1)RetryAndFollowUpInterceptor:重試和重定向攔截器

public Response intercept(Chain chain){
      while (true) {
        Response response;
          try {
          //建立StreamAllocation物件,這個物件會在連線攔截器中用到
            StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
                  createAddress(request.url()), call, eventListener, callStackTrace);
              this.streamAllocation = streamAllocation;
             呼叫責任鏈下游攔截器
             response = realChain.proceed(request, streamAllocation, null, null);
            } catch (RouteException e) {
                 // The attempt to connect via a route failed. The request will not have been sent.
                 路由異常,請求還沒發出去。
                 這樣這個recover(),如果返回的是false,則丟擲異常,不再重試
                 如果返回的是true,則執行下面的continue,進行下一次while迴圈,進行重試,重新發起網路請求。
                 if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
                   throw e.getFirstConnectException();
                 }
                 releaseConnection = false;
                continue;
             } catch (IOException e) {
                 // An attempt to communicate with a server failed. The request may have been sent.
                 請求已經發出去了,但是和伺服器連線失敗了。
                 這個recover()返回值的處理邏輯和上面異常一樣。
                 boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
                 if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
                 releaseConnection = false;
                 continue;
               }
             } finally {//finally是必定會執行到的,不管上面的catch中執行的是continue還是thow
                // We're throwing an unchecked exception. Release any resources.
                if (releaseConnection) {
                  streamAllocation.streamFailed(null);
                  streamAllocation.release();
                }
              }
             在這個重試攔截器中,okhttp的做法很巧妙。先是在外面有一個while迴圈,如果發生異常,
             會在recover方法中對異常型別進行判斷,如果不符合屬於重試,則返回false,並thow e,結束while迴圈。
             如果符合重試的條件,則返回true,在上面的catch程式碼塊中執行continue方法,進入下一個while迴圈。
            //如果請求正常,並且返回了response,則會進行重定向的邏輯判斷
            followUpRequest在這個方法中會根據ResponseCode,狀態碼進行重定向的判斷,
            Request followUp;
                 try {
                   followUp = followUpRequest(response, streamAllocation.route());
                 } catch (IOException e) {
                   streamAllocation.release();
                   throw e;
                 }
                 如果flolowUp 為null,則不需要重定向,直接返回response
                 if (followUp == null) {
                   if (!forWebSocket) {
                     streamAllocation.release();
                   }
                   return response;
                 }
                  如果flolowUp 不為null,則進行重定向了請求
               如果重定向次數超過MAX_FOLLOW_UPS=20次,則丟擲異常,結束while迴圈
              if (++followUpCount > MAX_FOLLOW_UPS) {
                     streamAllocation.release();
                     throw new ProtocolException("Too many follow-up requests: " + followUpCount);
                   }
                   if (followUp.body() instanceof UnrepeatableRequestBody) {
                     streamAllocation.release();
                     throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
                   }
                   if (!sameConnection(response, followUp.url())) {
                     streamAllocation.release();
                     //從重定向請求中拿到url,封裝一個新的streamAllocation物件,
                     streamAllocation = new StreamAllocation(client.connectionPool(),
                         createAddress(followUp.url()), call, eventListener, callStackTrace);
                     this.streamAllocation = streamAllocation;
                   } else if (streamAllocation.codec() != null) {
                     throw new IllegalStateException("Closing the body of " + response
                         + " didn't close its backing stream. Bad interceptor?");
                   }
                   //將重定向請求賦值給request 進入下一個重定向的請求的while迴圈,繼續走上面的while迴圈程式碼
                   request = followUp;
                   priorResponse = response;
                 }
 }
   //只有這個方法返回值為false都不進行重試。
   private boolean recover(IOException e, StreamAllocation streamAllocation,
       boolean requestSendStarted, Request userRequest) {
     streamAllocation.streamFailed(e);
     // The application layer has forbidden retries.
     應用層禁止重試。可以通過OkHttpClient進行設定(預設是允許的)
     if (!client.retryOnConnectionFailure()) return false;
     // We can't send the request body again.
     if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
     // This exception is fatal. 致命的異常
     判斷是否屬於重試的異常
     if (!isRecoverable(e, requestSendStarted)) return false;
     // No more routes to attempt.
     沒有更多可以連線的路由線路
     if (!streamAllocation.hasMoreRoutes()) return false;
     // For failure recovery, use the same route selector with a new connection.
     return true;
   }
  只有這個方法返回false,都不進行重試。
 private boolean isRecoverable(IOException e, boolean requestSendStarted) {
   // If there was a protocol problem, don't recover.
   出現了協定異常,不再重試
   if (e instanceof ProtocolException) {
     return false;
   }
   // If there was an interruption don't recover, but if there was a timeout connecting to a route
   // we should try the next route (if there is one).
   requestSendStarted為false時,並且異常型別為Scoket超時異常,將會進行下一次重試
   if (e instanceof InterruptedIOException) {
     return e instanceof SocketTimeoutException && !requestSendStarted;
   }
   // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
   // again with a different route.
   如果是一個握手異常,並且證書出現問題,則不能重試
   if (e instanceof SSLHandshakeException) {
     // If the problem was a CertificateException from the X509TrustManager,
     // do not retry.
     if (e.getCause() instanceof CertificateException) {
       return false;
     }
   }

2)BridgeInterceptor 橋攔截器:連線伺服器的橋樑,主要是在請求頭中設定一些引數設定

如:請求內容長度,編碼,gzip壓縮等。

public Response intercept(Chain chain) throws IOException {
     Request userRequest = chain.request();
     Request.Builder requestBuilder = userRequest.newBuilder();
     RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }
      ..................
    }
    在請求頭中新增gizp,是否壓縮
  boolean transparentGzip = false;
     if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
       transparentGzip = true;
       requestBuilder.header("Accept-Encoding", "gzip");
     }
    //cookies
     List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
     if (!cookies.isEmpty()) {
       requestBuilder.header("Cookie", cookieHeader(cookies));
     }
     呼叫責任鏈中下一個攔截器的方法,網路請求得到的資料封裝到networkResponse中
     Response networkResponse = chain.proceed(requestBuilder.build());
    對cookie進行處理
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    如果設定了gzip,則會對networkResponse進行解壓縮。
     if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    return responseBuilder.build();
}

3)CacheInterceptor快取攔截器

public Response intercept(Chain chain){
   //  this.cache = DiskLruCache.create(fileSystem, directory, 201105, 2, maxSize);
    這個快取在底層使用的是DiskLruCache
    //以request為key從快取中拿到response。
     Response cacheCandidate = cache != null
            ? cache.get(chain.request()): null;
     long now = System.currentTimeMillis();
     //快取策略
     CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
     Request networkRequest = strategy.networkRequest;
     Response cacheResponse = strategy.cacheResponse;
   // If we're forbidden from using the network and the cache is insufficient, fail.
   //如果請求和響應都為null,直接返回504
   if (networkRequest == null && cacheResponse == null) {
     return new Response.Builder()
         .request(chain.request())
         .protocol(Protocol.HTTP_1_1)
         .code(504)
         .message("Unsatisfiable Request (only-if-cached)")
         .body(Util.EMPTY_RESPONSE)
         .sentRequestAtMillis(-1L)
         .receivedResponseAtMillis(System.currentTimeMillis())
         .build();
   }
   // If we don't need the network, we're done.
   //如果請求為null,快取不為null,則直接使用快取。
       if (networkRequest == null) {
         return cacheResponse.newBuilder()
             .cacheResponse(stripBody(cacheResponse))
             .build();
       }
     Response networkResponse = null;
        try {
          //呼叫責任鏈下一個攔截器
          networkResponse = chain.proceed(networkRequest);
        } finally {
        }
      Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
     // Offer this request to the cache.
     //將響應存入快取。
      CacheRequest cacheRequest = cache.put(response);
}

4)ConnectInterceptor 連線攔截器。當一個請求發出,需要建立連線,然後再通過流進行讀寫。

public Response intercept(Chain chain) throws IOException {
     RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    //在重定向攔截器中建立,
    StreamAllocation streamAllocation = realChain.streamAllocation();
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //從連線池中,找到一個可以複用的連線,
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
   // RealConnection 中封裝了一個Socket和一個Socket連線池
    RealConnection connection = streamAllocation.connection();
    //呼叫下一個攔截器
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
//遍歷連線池
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }
  public boolean isEligible(Address address, @Nullable Route route) {
    // If this connection is not accepting new streams, we're done.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;
    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    // If the host exactly matches, we're done: this connection can carry the address.
    從連線池中找到一個連線引數一致且並未佔用的連線
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
  }

5)CallServerInterceptor 請求伺服器攔截器

/** This is the last interceptor in the chain. It makes a network call to the server. */
這是責任鏈中最後一個攔截器,這個會去請求伺服器。
 public Response intercept(Chain chain) throws IOException {
      RealInterceptorChain realChain = (RealInterceptorChain) chain;
      HttpCodec httpCodec = realChain.httpStream();
      StreamAllocation streamAllocation = realChain.streamAllocation();
      RealConnection connection = (RealConnection) realChain.connection();
      Request request = realChain.request();
      //將請求頭寫入快取
      httpCodec.writeRequestHeaders(request);
      return response;

到此這篇關於Java OkHttp框架原始碼深入解析的文章就介紹到這了,更多相關Java OkHttp框架內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com