基于版本okhttp 4.4.0 通过kotlin重写,简洁很多,流程很清晰

1. 请求流程

通过一个简单的异步请求,来查看整个请求过程

 fun testCall(){
 val okHttpClient: OkHttpClient = 
        OkHttpClient.Builder()
            .build()
        val request = Request.Builder()
            .get()
            .url("http://t.weather.sojson.com/api/weather/city/101030100")
            .build()

        okHttpClient.newCall(request).enqueue(object : Callback{
            override fun onFailure(call: Call, e: IOException) {
                System.out.println("testCall,onFailure")

            }

            override fun onResponse(call: Call, response: Response) {
                val json = response.body?.string()
                System.out.println("testCall,onSuccess:json:$json")
            }

        })

    }

可以看到Okhttp对外的api很简洁易用。需要构建三个对象,OkhttpClient,Request,异步需要Callback.通过两个方法串联起三个对象。newCall(request),enqueue(callback).我们的解析也是围绕着newCall,enqueue开始。

1. newCall,enqueue
  override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)

很简单,返回一个RealCall(this,request).RealCall是对request的封装。然后RealCall调用enqueue()

 override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

会把executed置为true,并callStart()开始一个eventListener回调。最后一行核心代码把一个AsyncCall传递给client.dispatcher,如队列并开始请求。AsyncCall是一个RealCall的内部类,并且继承自Runnable.最终会通过executeOn,执行到run方法,开始进行网络请求。然后返回结果执行responseCallback的回调,回调方法都在run方法中执行,运行在线程池中。所有回调方法都是在异步线程中。

  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable

最关键的怎么获取response的部分

  override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } 
      ....
    }

可见是通过getResponseWithInteceptorChain()回去Response,然后通过resonseCallback.onResponse回调回去。到这里我们还是先回到dispatcher.enqueue(asyncCall)最终是怎么执行到AsyncCall的runn方法的。

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

Dispatcher有三个AsyncCall队列分别是。

  1. readyAsyncCalls: ArrayQueue
  2. runningAsyncCalls: ArrayQueue
  3. runningSyncCalls: ArrayQueueu

在enqueue方法中,先把asyncCall加入到readyAsyncCalls中。并且通过findExistingCallWithHost(call.host),通过host 查找是否存在host相同的call,如果存在则复用。这里稍等之后分析。promoteAndExecute()则是开启请求的地方。

  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
        // 判断完毕,从readyAsyncCalls中移除
        i.remove()
        // callsPerHost 加1
        asyncCall.callsPerHost.incrementAndGet()
        // 可执行的execueteCalls添加asyncCall
        executableCalls.add(asyncCall)
        // 正在运行的runningAsyncCalls添加
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    // 开始执行executeableCalls ,在线程池中执行
    // executorService 是可以OkhttpClient初始化传入
    // 默认使用dispatcher的 executorService
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      // 开始在asyncCall中执行,run方法
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

为了防止线程安全问题加了synchronized关键字,遍历readyAsyncCallsk,判断是否runningAsyncCalls正在运行的请求超过最大值,如果超过则放弃此次请求。然后判断asyncCall.callsPerhost 对同一个host的请求是否超过最大值,超过则continue,下一个。

getResponseWithInterceptorChain()开始请求

这是okhttp 设计中非常精妙的一点通过责任链设计模式,把完成请求的一个个步骤串联起来。通过拦截器将请求-响应整个过程穿起来。把代码逻辑分散,不同拦截器执行不同的任务,okhttp默认有四个非常重要的拦截器,分别是:RetryAndFollowUpIntercetpor,BridgeInterceptor,CacheIntercetpor,ConnectInteceptor,CallServerIntercetpor.而且添加顺序也很重要。

  internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    // 添加用户自定义的interceptors
    interceptors += client.interceptors
    interceptors +=
    // 请求重试
    RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    // 缓存的Intercetpor ,需要重头分析
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    // 从服务器获取连接
    interceptors += CallServerInterceptor(forWebSocket)

    // InterceptorChain 拦截器链,很经典的设计
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
    // 通过chain.proceed启动连接器,执行请求。
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    ...
  }

拦截器的实现非常巧妙。RealInterceptorChain实现Interceptor.Chain,proceed(request)方法。责任链模式的重点在于链式调用,但okhttp明显是一个interceptors List。不是链,那是怎样完成一个链式结构的呢。重点在于RealInterceptorChain,这个类扮演了链表中的节点,在proceed方法中,调用Interceptor的intercept,通过对index +1 ,取出intercetpros List中的下一个interceptor并新建一个RealInterceptor ,再次传入到intercetpor.intercept(chain)中。这样在Interceptor中调用,chain.proceed(request)就像是直接调用链表的下一个节点的方法。

从Interceptors 第一个元素开始调用intrecept方法。在方法里面在调用chain.proceed(request)之前,对request 的所有操作都是顺序执行,从Interceptor1到InterceptorN 顺序执行。在response = chain.proceed(request)之后则反过来,一个倒U形结构。反过来是指,对Response的操作是从InterceptorN 开始到Interceptor1.也许会有疑问这样一直到链表下一个节点,最后是哪一个?在getResponseWithInterceptorChain方法中,依然赋值了,最后一个是CallServerInterceptor,并且chain.proceed()方法对index做了检查。CallServerInterceptor 没有调用chain.proceed 如果调用了则会报异常。通过责任链实现的Request-Response 整个链中,可以做对request,Response的各种操作。用于日志打印,缓存等操作。

非常重要的五个拦截器

  1. RetryAndFlowUpInterceptor
  2. BridgeInterceptor
  3. CacheInterceptor
  4. ConnectInterceptor
  5. CallServerInterceptor

后三个拦截器非常重要,涉及到缓存,简历http连接,网络请求的操作。而这些操作是对网络请求最重要的操作。CacheInterceptor相对简单,这里从ConnectInterceptor开始分析,看okhttp的连接是怎么建立的。

1. ConnectIntercetpor

可以看到ConnectInterceptor 的实现非常简单,主要是返回一个新的Exchange设置到Chain中

object ConnectInterceptor : Interceptor {
  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }
}

大量的细节在realChaihn.call.initExchange(chain)中。

  internal fun initExchange(chain: RealInterceptorChain): Exchange {
    ...
    val codec = exchangeFinder!!.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder!!, codec)
    this.interceptorScopedExchange = result
    ...
      return result
  }

新创建了一个Exchange,并从exchangeFinder.find获取了codec。

2. 拦截器 CallServerInterceptor

这个拦截器很重要,通过ConnectInterceptor中建立的连接Exchagne开始网络请求。

override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    ...

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
   ...
    return response
  }

省略一些细节,看到关键的获取ResponseBody的部分在于exchain.openResponseBody(response),这里看到exchange怎么实现的。

 fun openResponseBody(response: Response): ResponseBody {
    try {
      val contentType = response.header("Content-Type")
      val contentLength = codec.reportedContentLength(response)
      val rawSource = codec.openResponseBodySource(response)
      // 关键代码
      val source = ResponseBodySource(rawSource, contentLength)
      return RealResponseBody(contentType, contentLength, source.buffer())
 ...
  }

可以看到关键的rawSource是通过codec.openResponseBodySource(response)获取。codec是ExchangeCodec接口类型。它的获取是在RealCall的InitExchange中通过exchangeFinder获取的。那我们从这里看它的实现类是谁。

 fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
 ...
  }

这里的目的是获取RealConnection并通过newCodec获取一个ExchangeCodec。findHealthyConnection会调用到findConnection 这个方法比较复杂,主要涉及到connectionPool对连接进行复用,需要展开分析

 private fun findConnection(
    connectTimeout: Int,
    readTimeout: Int,
    writeTimeout: Int,
    pingIntervalMillis: Int,
    connectionRetryEnabled: Boolean
  ): RealConnection {
    var foundPooledConnection = false
    var result: RealConnection? = null


    var routes: List<Route>? = null
    synchronized(connectionPool) {
   ...
        routes = routeSelection!!.routes
        if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
          foundPooledConnection = true
          result = call.connection
        }
      }
      
          // Do TCP + TLS handshakes. This is a 
          // 特别重要建立连接
    result!!.connect(
        connectTimeout,
        readTimeout,
        writeTimeout,
        pingIntervalMillis,
        connectionRetryEnabled,
        call,
        eventListener
    )
    call.client.routeDatabase.connected(result!!.route())

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection!!.next()
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        result = RealConnection(connectionPool, selectedRoute!!)
        connectingConnection = result
      }
    }

  ...

   ...
   ...
      } else {
        connectionPool.put(result!!)
        call.acquireConnectionNoEvents(result!!)
      }
    }
   ...
    return result!!
  }

大概的过程是先从连接池获取RealConnection,如果没有则新建一个RealConnection,并put到connectionPool。对于connect的管理也是okhttp进行网络复用很重要的一点。可以看到callAcquirePooledConnection 这个方法

  fun callAcquirePooledConnection(
    address: Address,
    call: RealCall,
    routes: List<Route>?,
    requireMultiplexed: Boolean
  ): Boolean {
    this.assertThreadHoldsLock()

    for (connection in connections) {
      if (requireMultiplexed && !connection.isMultiplexed) continue
      if (!connection.isEligible(address, routes)) continue
      call.acquireConnectionNoEvents(connection)
      return true
    }
    return false
  }

有两个判断重点是connect.isEligible()方法(是否合格)决定是否复用连接。如果可以复用则通过call.acquireConnectionNoEvents(connection)把连接设置到RealCall中。

  internal fun isEligible(address: Address, routes: List<Route>?): Boolean {

    if (calls.size >= allocationLimit || noNewExchanges) return false

    // If the non-host fields of the address don't overlap, we're done.
    if (!this.route.address.equalsNonHost(address)) return false

    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url.host == this.route().address.url.host) {
      return true // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false

    // 2. The routes must share an IP address.
    if (routes == null || !routeMatchesAny(routes)) return false

    // 3. This connection's server certificate's must cover the new host.
    if (address.hostnameVerifier !== OkHostnameVerifier) return false
    if (!supportsUrl(address.url)) return false

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
    } catch (_: SSLPeerUnverifiedException) {
      return false
    }
 connection.
  }

可以看到具体的规则写的很清楚了。首先要address 相等。host和route host相等直接复用。接着写明是http2才可以复用。如果是htt2,则要share IP Address,证书必须匹配host才可以复用。
接下来看realConnection.newCodec()。并且在这里connect建立连接。底层是socket连接。

 internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    ...
    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
      ...
      Http1ExchangeCodec(client, this, source, sink)
    }
  }

可以看到如果http2Connection 不为空则使用Http2ExchangeCodec否则使用HTTP1ExchagneCodec。传入关键的source到ExchangeCodec然后在ResponseBody中读取。这里看realConnection中的source怎么获取的。

  private fun connectSocket(
    connectTimeout: Int,
    readTimeout: Int,
    call: Call,
    eventListener: EventListener
  ) {
    val proxy = route.proxy
    val address = route.address
    
    // 建立socket连接,非常重要
    val rawSocket = when (proxy.type()) {
      Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
      else -> Socket(proxy)
    }
    this.rawSocket = rawSocket

    eventListener.connectStart(call, route.socketAddress, proxy)
    rawSocket.soTimeout = readTimeout
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress, connectTimeout)
    } catch (e: ConnectException) {
      throw ConnectException("Failed to connect to ${route.socketAddress}").apply {
        initCause(e)
      }
    }

    try {
      source = rawSocket.source().buffer()
      sink = rawSocket.sink().buffer()
    } catch (npe: NullPointerException) {
      if (npe.message == NPE_THROW_WITH_NULL) {
        throw IOException(npe)
      }
    }
  }

非常关键的代码通过socket建立连接,并且返回source.source也就是ResponseBody 的sourse来源。整个网络请求过程简单总结:

  1. 通过Exchange 作为门面管理网络请求,获取到codec
  2. ExchangeCodec管理connection.在这个过程中会findConnection.优先从连接池中获取connection.如果没有则新建,并connect
  3. 整个网络请求底层是通过Socket建立,okhttp也就是对socket的封装,通过socket ,connect 之后开始获取数据
  4. 通过okio 获取数据并返回到CallServerInterceptor,最终在整个责任链中进行response处理。
3. 缓存 CacheInterceptor

接下来分析关于缓存的处理,缓存对于网络请求,在节约流量和相应速度和系统资源都是很重要的。okhttp的缓存采用CacheInterceptor封装,具体的实现由不同的CacheStrategy.涉及到的类有Cache,DiskLruCache,CacheStrategy.缓存的主要入口在Cache中。cache 是作为OkhttpClient的一个成员属性,并且没有默认初始化,需要通过cache(cache:Cache)设置Cache需要的目录和缓存大小。

  override fun intercept(chain: Interceptor.Chain): Response {
    // 从cache中通过request获取cacheCandidate
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()
    
    // 构建缓存策略稍后具体分析
    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)

    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      cacheCandidate.body?.closeQuietly()
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 如果networkRequest 为null 并且cacheResponse也为空。则
    // 请求失败,返回空的Empty Response
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    // If we don't need the network, we're done.
    // 不需要网络请求直接使用缓存
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }

    var networkResponse: Response? = null
    try {
    // 开始进行网络请求,获取networkResponse
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
    // 如果请求的response 为更改,为HTTP_NOT_MODIFIED则
    // 则直接使用cacheResponse 继续作为cacheResponse保存
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        // 对缓存进行更新
        cache.update(cacheResponse, response)
        return response
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }
    
    // 如果是网络请求获取的networkResponse为最新则获取新的netwrokResponse
    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        // 新的response 添加到cache中
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)
      }

    // 无效的请求移除cache
      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

CacheIntercetpor中的代码逻辑几乎没有多余的,都是关键的逻辑代码。梳理一下主要一些流程。从cache中通过request获取cacheCandidate。然后构建cacheStrategy. 在通过Strtegy获取的netwrokRequest和cacheResponse 判断返回缓存还是继续请求。如果请求的结果未更新,则更新缓存。如果是更新过的networkResposne则重新把缓存添加到cache中。接下来看strategy是怎么做到的,以及cache 如何实现对缓存的添加和获取。Cache采用DishLrucache实现,内置cache成员。

  internal fun get(request: Request): Response? {
  // 通过url 获取md5值作为key
    val key = key(request.url)
    // 从DiskLrucache中获取snapshot
    val snapshot: DiskLruCache.Snapshot = try {
      cache[key] ?: return null
    } catch (_: IOException) {
      return null // Give up because the cache cannot be read.
    }

    val entry: Entry = try {
      Entry(snapshot.getSource(ENTRY_METADATA))
    } catch (_: IOException) {
      snapshot.closeQuietly()
      return null
    }

    // 从dishlrucache中获取resposne返回
    val response = entry.response(snapshot)
    if (!entry.matches(request, response)) {
      response.body?.closeQuietly()
      return null
    }

    return response
  }

接下来看怎么添加缓存

  internal fun put(response: Response): CacheRequest? {
    val requestMethod = response.request.method

    // 判断缓存是否有效,根据请求类型来判断
    if (HttpMethod.invalidatesCache(response.request.method)) {
      try {
        remove(response.request)
      } catch (_: IOException) {
        // The cache cannot be written.
      }
      return null
    }

    // 非GET请求不进行缓存
    if (requestMethod != "GET") {
      // Don't cache non-GET responses. We're technically allowed to cache HEAD requests and some
      // POST requests, but the complexity of doing so is high and the benefit is low.
      return null
    }

    if (response.hasVaryAll()) {
      return null
    }

    // 构建Entry 保存进Lrucache中
    val entry = Entry(response)
    var editor: DiskLruCache.Editor? = null
    try {
      editor = cache.edit(key(response.request.url)) ?: return null
      entry.writeTo(editor)
      return RealCacheRequest(editor)
    } catch (_: IOException) {
      abortQuietly(editor)
      return null
    }
  }

参考资料