温馨提示:这篇文章已超过443天没有更新,请注意相关的内容是否还可用!
摘要:本文将详细介绍Android开发中广泛使用的网络请求框架OkHttp。文章涵盖了OkHttp的核心功能、使用方法和优势,包括其高效的网络请求处理、灵活的请求和响应处理机制、连接池优化等特点。本文全方位解析OkHttp,帮助开发者更好地理解和应用这一主流网络请求框架。
《主流网络请求框架 OkHttp 全方位详析》掘金地址
1、OkHttp介绍
OkHttp 是一个开源的 HTTP 客户端库,由 Square 公司开发并维护。它提供了简洁、高效的 API,用于处理网络请求和响应。OkHttp 基于 Java 编写,同时也提供了对 Kotlin 的良好支持。
OkHttp 提供了以下主要特性:
- 简洁易用的 API:OkHttp 提供了简洁而强大的 API,使得进行 HTTP 请求变得非常容易。通过构建 Request 对象和使用 Call 对象来发起同步或异步的网络请求。
- 支持 HTTP/2 和 SPDY:OkHttp 支持现代的 HTTP 协议,包括 HTTP/2 和 SPDY,从而提供更快速和更有效率的网络通信。
- 连接池和缓存:OkHttp 内置了连接池和响应缓存,可以有效地管理和复用网络连接,并提供可配置的缓存机制,减少重复的网络请求。
- 拦截器:OkHttp 提供了拦截器的机制,允许开发者在发送请求和接收响应的过程中进行自定义处理,例如添加公共参数、日志记录等。
- 支持 GZIP 压缩:OkHttp 支持接受和解压 GZIP 压缩的响应数据,减小网络传输的数据量,提升网络性能。
- 适配 Android 平台:OkHttp 在 Android 开发中得到广泛应用,它提供了专门针对 Android 平台的优化,包括性能、安全和稳定性方面的考虑。
2、OkHttp基本使用与请求流程
基本使用
val client = OkHttpClient.Builder() .callTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS) .connectTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS) .readTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS) .writeTimeout(5000L, java.util.concurrent.TimeUnit.MILLISECONDS) .retryOnConnectionFailure(true) .followRedirects(true) .followSslRedirects(true) .cache(null) // 设置缓存 .authenticator(null) // 设置身份验证器 .certificatePinner(null) // 设置证书锁定器 .connectionPool(null) // 设置连接池 .connectionSpecs(listOf()) // 设置连接规范 .cookieJar(null) // 设置 Cookie 管理器 .dispatcher(null) // 设置分发器 .dns(null) // 设置 DNS 解析器 .eventListenerFactory(null) // 设置事件监听器工厂 .proxy(null) // 设置代理 .protocols(listOf()) // 设置支持的协议 .proxyAuthenticator(null) // 设置代理身份验证器 .proxySelector(null) // 设置代理选择器 .socketFactory(null) // 设置 Socket 工厂 .sslSocketFactory(null) // 设置 SSL Socket 工厂 .hostnameVerifier(null) // 设置主机名验证器 .proxy(proxy) // 设置代理 .build() val request = Request.Builder() .url(url) .header("xxx", "xxx") .addHeader("xxx", "xxx") .post(RequestBody.create(null, "XXX")) // 使用 POST 方法并传入请求体,不写默认为 GET 方法 .cacheControl(okhttp3.CacheControl.FORCE_NETWORK) // 设置缓存控制 .tag("custom-tag") // 设置标记 .build() val call = client.newCall(request) // 构造 Call 对象之后就可以同步或异步请求,并处理结果了 // 1、同步 client.newCall(call).execute().use { response -> if (response.isSuccessful){ Log.v("同步请求响应:${response.body?.string()}") }else{ Log.e("同步请求失败") } } // 2、异步 client.newCall(call).enqueue(object : Callback { override fun onFailure(call: Call, e: IOException) { Log.e("异步请求失败: ${e.message}") } override fun onResponse(call: Call, response: Response) { Log.v("异步请求响应:${response.body?.string()}") } })
请求流程
3、分发器Dispatcher
3.1、异步请求分发流程
首先我们要知道异步请求有两个队列,ready 队列和 running 队列,前者用来记录等待执行的请求,后者用来记录正在执行的请求:
// ready 队列 private val readyAsyncCalls = ArrayDeque() // running 队列 private val runningAsyncCalls = ArrayDeque()
我们来看看 enqueue() 方法:
internal fun enqueue(call: AsyncCall) { synchronized(this) { readyAsyncCalls.add(call) // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to // the same host. if (!call.call.forWebSocket) { val existingCall = findExistingCallWithHost(call.host) if (existingCall != null) call.reuseCallsPerHostFrom(existingCall) } } promoteAndExecute() }
其中 synchronized() 代码段中,先将请求加入 ready 队列,随后判断当前请求是否为 WebSocket,如果不是就调用 findExistingCallWithHost() 方法,在 running 和 ready 队列中查找与当前请求 Host 相同的请求,如果找到了,就让相同 Host 请求中的 callsPerHost 变量共享同一个对象,这个对象是 AtomicInteger 类型,用于对请求做一些限制,在下文有解释,可以先往下看。
synchronized() 代码段之外,执行的 promoteAndExecute() 方法是分发器分发异步请求的关键,先来看看源码:
private fun promoteAndExecute(): Boolean { this.assertThreadDoesntHoldLock() val executableCalls = mutableListOf() val isRunning: Boolean synchronized(this) { val i = readyAsyncCalls.iterator() while (i.hasNext()) { val asyncCall = i.next() if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity. i.remove() asyncCall.callsPerHost.incrementAndGet() executableCalls.add(asyncCall) runningAsyncCalls.add(asyncCall) } isRunning = runningCallsCount() > 0 } for (i in 0 until executableCalls.size) { val asyncCall = executableCalls[i] asyncCall.executeOn(executorService) } return isRunning }
synchronized() 代码段中主要完成了检查 ready 队列,并将符合条件可执行的任务添加到 running 队列和 executableCalls 队列中,这个 executableCalls 保存的才是这次要执行的请求,同步代码块之外,就遍历 executableCalls 执行请求。
主要过程可以用下图表示:
其中 ThreadPool 阶段较为重要,是请求效率提升的关键,我们先来回顾一下 Java 线程池的机制:
当一个任务通过 execute(Runnable) 方法添加到线程池时,有两种情况:
- 一、线程数量小于 corePoolSize,则新建线程(核心)来处理被添加的任务。
- 二、线程数量大于或等于 corePoolSize,则新任务被添加到等待队列,若添加失败:
- 线程数量小于 maximumPoolSize,新建线程执行新任务。
- 线程数量等于 maximumPoolSize,使用 RejectedExecutionHandler 拒绝策略。
了解了 Java 线程池机制后,我们来看下方 OkHttp 源码,下方的 ExecutorService 对象就是 OkHttp 中的线程池,其中为了提升请求效率,则不能让新任务被添加到等待队列,而是要新建线程执行新任务,因此将 corePoolSize 设置为0,并传入无界队列 SynchronousQueue 使得每次添加新任务到队列都失败,则会新建线程去执行新任务。
@get:JvmName("executorService") val executorService: ExecutorService get() { if (executorServiceOrNull == null) { executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS, SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false)) } return executorServiceOrNull!! }
3.2、异步分发限制
限制 1:正在运行的异步请求数不超过最大值,默认是64
// 上面代码中: if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
限制 2:正在运行的异步请求中相同 Host 的数量不超过最大值,默认是5,防止单个客户端与服务端连接太多造成巨大压力
// 上面代码中: if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
其中 callsPerHost 是 AtomicInteger 类型数据,记录的是所有请求中有多少个 Host 相同。
3.3、分发器处理同步请求
了解了分发器处理异步请求的过程之后,同步请求就很简单了,同步请求只有一个 running 队列:
// running 队列 private val runningSyncCalls = ArrayDeque()
Dispatcher 中的 execute() 方法仅将 Call 对象添加到 running 队列中:
@Synchronized internal fun executed(call: RealCall) { runningSyncCalls.add(call) } override fun execute(): Response { check(executed.compareAndSet(false, true)) { "Already Executed" } timeout.enter() callStart() try { client.dispatcher.executed(this) // 这里调用的就是上方的 executed 方法 return getResponseWithInterceptorChain() } finally { client.dispatcher.finished(this) } }
其中值得注意的是上述代码块 finally 中的 client.dispatcher.finished(this),在新版本的 OkHttp 中,异步和同步的 finished() 方法都会触发 promoteAndExecute() 方法,遍历检查异步请求中的等待队列:
internal fun finished(call: RealCall) { finished(runningSyncCalls, call) } private fun finished(calls: Deque, call: T) { val idleCallback: Runnable? synchronized(this) { if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!") idleCallback = this.idleCallback } val isRunning = promoteAndExecute() // 异步分发遍历检查 if (!isRunning && idleCallback != null) { idleCallback.run() } }
4、OkHttp拦截器责任链设计模式与功能概述
设计特点:请求从前往后,响应从后往前,根据 interceptors 中的拦截器顺序执行拦截,有点类似于递归的输出形式 。
功能概述:
- 重试重定向拦截器:在交给下一个拦截器之前,负责判断用户是否取消了请求;在获得响应之后,会根据响应码判断是否需要重定向,如果满足条件那么就会重启执行所有拦截器。
- 桥接拦截器:在交给下一个拦截器之前,负责将 HTTP 协议必备的请求头加入其中(如 Host)并添加一些默认的行为(如 GZIP 压缩);在获得响应之后,调用保存 cookie 接口并解析 GZIP 数据。
- 缓存拦截器:在交给下一个拦截器之前,读取并判断是否使用缓存;在获得响应之后判断是否缓存。
- 连接拦截器:在交给下一个拦截器之前,负责找到或新建一个连接,并获得对应的 socket 流;在获得响应之后不进行额外的处理。
- 请求服务器拦截器:与服务器进行通信,向服务器发送请求,解析读取响应数据。
5、OkHttp五大拦截器
5.1、重试重定向拦截器 (RetryAndFollowUpInterceptor)
这个拦截器主要是在请求出错之后重试或根据响应码重定向,先来看看 intercept() 方法中重试部分的源码:
override fun intercept(chain: Interceptor.Chain): Response { ...... ...... try { if (call.isCanceled()) { throw IOException("Canceled") } try { response = realChain.proceed(request) newExchangeFinder = true } catch (e: RouteException) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) { throw e.firstConnectException.withSuppressed(recoveredFailures) } else { recoveredFailures += e.firstConnectException } newExchangeFinder = false continue } catch (e: IOException) { // An attempt to communicate with a server failed. The request may have been sent. if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } else { recoveredFailures += e } newExchangeFinder = false continue } ...... ...... }
其中有异常之后就运行到 catch 代码块内进行重试,并根据 recover() 方法判断是否满足重试限制,如果满足,则将重试次数加一,并 continue 进入到下一次重试。
重试限制
上面提到了执行重试要根据 recover() 方法判断是否满足重试限制,那么执行重试有什么限制呢?让我们来看看 recover() 方法的源码:
private fun recover( e: IOException, call: RealCall, userRequest: Request, requestSendStarted: Boolean ): Boolean { // OkHttpClient 配置不重试 // The application layer has forbidden retries. if (!client.retryOnConnectionFailure) return false // 1、如果是 IO 异常(非 http2 中断异常)表示请求可能发出 // 2、如果请求体配置为只能被使用一次(默认为 false,可重复使用) // We can't send the request body again. if (requestSendStarted && requestIsOneShot(e, userRequest)) return false // 异常不重试:协议异常、IO 中断异常(除 Socket 读写超时之外),ssl 认证异常 // This exception is fatal. if (!isRecoverable(e, requestSendStarted)) return false // 判断是否有更多的路线,没有则不重试,即时前面的条件都满足 // No more routes to attempt. if (!call.retryAfterFailure()) return false // For failure recovery, use the same route selector with a new connection. return true }
以上限制条件按顺序可表示为下图:
重定向规则
执行完上述重试代码块之后,会判断是否需要重定向,那么在 RetryAndFollowUpInterceptor 中,什么情况才会触发重定向呢?我们来看看重试重定向拦截器 intercept() 方法中重定向部分的源码:
override fun intercept(chain: Interceptor.Chain): Response { ...... ...... val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) if (followUp == null) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } val followUpBody = followUp.body if (followUpBody != null && followUpBody.isOneShot()) { closeActiveExchange = false return response } response.body?.closeQuietly() if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount") } request = followUp // 这里用 RetryAndFollowUpInterceptor 内部成员 request 接收 followUp 之后,又会运行到外部循环,进行重定向 priorResponse = response ...... ...... }
其中,最关键的一句是val followUp = followUpRequest(response, exchange),它的返回值决定了 followUp 是否为 null,如果为 null,则不会重定向。
那么,让我们来看看 followUpRequest() 方法的源码:
private fun followUpRequest(userResponse: Response, exchange: Exchange?): Request? { val route = exchange?.connection?.route() val responseCode = userResponse.code val method = userResponse.request.method when (responseCode) { HTTP_PROXY_AUTH -> { ...... } HTTP_UNAUTHORIZED -> ...... HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> { ...... } HTTP_CLIENT_TIMEOUT -> { ...... } HTTP_UNAVAILABLE -> { ...... } HTTP_MISDIRECTED_REQUEST -> { ...... } else -> return null } }
此方法中主要是根据响应码来判断是否满足重定向条件,由上到下依次如下表格所示:
响应码 说明 执行重定向要满足的条件 HTTP_PROXY_AUTH(407) 代理需要授权,如付费代理,需要验证身份 通过 proxyAuthenticator 获得到了 Request。
例如添加 Proxy-Authorization 请求头HTTP_UNAUTHORIZED(401) 服务器需要授权,如某些接口需要登录才能使用(不安全,基本上没用了) 通过 authenticator 获得到了 Request。
例如添加 Authorization 请求头HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER(3XX) 重定向响应 OkHttpClient 配置了允许重定向(OkHttpClient.Builder().followRedirects(true),默认为 true) HTTP_CLIENT_TIMEOUT(408) 请求超时 1、OkHttpClient 配置了重试(默认允许重试)
2、本次请求的结果不是响应408的重试结果,即不能连续两次响应408,否则第二次不再重试
3、服务器未响应 Retry-After 或者响应 Retry-After:0HTTP_UNAVAILABLE(503) 服务不可用 1、本次请求结果不是响应503的重试结果,和上述408相似
2、服务器明确响应 Rerty-After:0,即立即重试HTTP_MISDIRECTED_REQUEST(421) 从当前客户端所在的 IP 地址到服务器的连接数超过了服务器许可的最大范围 自动再次使用另一个连接对象发起请求 5.2、桥接拦截器 (BridgeInterceptor)
桥接拦截器主要是配置了默认请求头,比如 Host 是 HTTP 中必备的请求头,但是我们平时并不会给 request 配置 Host 请求头,然而我们依然能用,原因就是在桥接拦截器为我们自动配置了 Host 请求头。
来看看 BridgeInterceptor 中 intercept 源码:
override fun intercept(chain: Interceptor.Chain): Response { val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() val body = userRequest.body if (body != null) { val contentType = body.contentType() if (contentType != null) { requestBuilder.header("Content-Type", contentType.toString()) } val contentLength = body.contentLength() if (contentLength != -1L) { requestBuilder.header("Content-Length", contentLength.toString()) requestBuilder.removeHeader("Transfer-Encoding") } else { requestBuilder.header("Transfer-Encoding", "chunked") requestBuilder.removeHeader("Content-Length") } } if (userRequest.header("Host") == null) { requestBuilder.header("Host", userRequest.url.toHostHeader()) } if (userRequest.header("Connection") == null) { requestBuilder.header("Connection", "Keep-Alive") } // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing // the transfer stream. var transparentGzip = false if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) { transparentGzip = true requestBuilder.header("Accept-Encoding", "gzip") } val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie", cookieHeader(cookies)) } if (userRequest.header("User-Agent") == null) { requestBuilder.header("User-Agent", userAgent) } val networkResponse = chain.proceed(requestBuilder.build()) cookieJar.receiveHeaders(userRequest.url, networkResponse.headers) val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) && networkResponse.promisesBody()) { val responseBody = networkResponse.body if (responseBody != null) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding") .removeAll("Content-Length") .build() responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type") responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer())) } } return responseBuilder.build() }
其中除了配置请求头,还有比较重要的三个作用:
- 发出请求之前执行val cookies = cookieJar.loadForRequest(userRequest.url)加载 cookies。
- 获取响应之后执行cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)保存 cookies。
- 如果服务端响应的数据是 GZIP 格式压缩的,则执行val gzipSource = GzipSource(responseBody.source())进行 GZIP 解压。
5.3、缓存拦截器 (CacheInterceptor)
在了解缓存拦截器之前,先来简单了解下 HTTP 的缓存机制,主要分为以下两种:
- 强缓存:客户端不会将请求发送给服务器,而是将本地缓存响应出去。强缓存是利用 HTTP 的响应头中的 Expires 或者 Cache-Control两个字段控制的,用来表示资源的缓存时间。
- 协商缓存:客户端会将请求发送给服务器。服务器根据请求头中的 Last-Modify/If-Modify-Since 或 Etag/If-None-Match 来判断协商结果,如果协商成功,即资源尚未改变,则返回304,告诉客户端可以从缓存中加载资源,如果不成功,则返回新的资源。
了解完以上机制之后,我们可以看看缓存拦截器中是怎么处理本地缓存和响应数据的,源码:
override fun intercept(chain: Interceptor.Chain): Response { val call = chain.call() val cacheCandidate = cache?.get(chain.request()) val now = System.currentTimeMillis() val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse cache?.trackResponse(strategy) val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE if (cacheCandidate != null && cacheResponse == null) { // The cache candidate wasn't applicable. Close it. cacheCandidate.body?.closeQuietly() } // If we're forbidden from using the network and the cache is insufficient, fail. if (networkRequest == null && cacheResponse == null) { ...... } // If we don't need the network, we're done. if (networkRequest == null) { ...... } if (cacheResponse != null) { listener.cacheConditionalHit(call, cacheResponse) } else if (cache != null) { listener.cacheMiss(call) } var networkResponse: Response? = null try { networkResponse = chain.proceed(networkRequest) } finally { // If we're crashing on I/O or otherwise, don't leak the cache body. if (networkResponse == null && cacheCandidate != null) { cacheCandidate.body?.closeQuietly() } } // If we have a cache response too, then we're doing a conditional get. if (cacheResponse != null) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { ...... } else { cacheResponse.body?.closeQuietly() } } val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() if (cache != null) { ...... } return response }
其中最关键的代码是:
val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse
获取缓存策略 strategy,并根据其中 networkRequest 和 cacheResponse 变量的值进行后续操作,处理规则如下表格所示:
networkRequest cacheResponse 处理方式 Null Not Null 直接使用缓存(强缓存) Not Null Null 向服务器发起请求(协商缓存或普通请求) Null Null 要求使用缓存,但是没有缓存,则 OkHttp 直接在本地创建一个响应,返回504 Not Null Not Null 发起请求,若得到响应为304(表示资源未修改),则更新缓存并返回(协商缓存) 5.4、连接拦截器 (ConnectInterceptor)
连接拦截器主要是获取一个连接,如果无法用缓存,则会用这个连接进行网络请求,ConnectInterceptor 的 intercept() 方法代码只有四行:
override fun intercept(chain: Interceptor.Chain): Response { val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(chain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) }
学习连接拦截器主要有两个要点,新建连接和连接池:
新建连接
新建连接主要是通过执行 val exchange = realChain.call.initExchange(chain) 这行代码,进入 initExchange() 方法内部:
internal fun initExchange(chain: RealInterceptorChain): Exchange { synchronized(this) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) } val exchangeFinder = this.exchangeFinder!! val codec = exchangeFinder.find(client, chain) val result = Exchange(this, eventListener, exchangeFinder, codec) this.interceptorScopedExchange = result this.exchange = result synchronized(this) { this.requestBodyOpen = true this.responseBodyOpen = true } if (canceled) throw IOException("Canceled") return result }
其中比较重要的是 val codec = exchangeFinder.find(client, chain),这里的 exchangeFinder 用的是重试重定向拦截器中 call.enterNetworkInterceptorExchange(request, newExchangeFinder) 这句代码传入的 newExchangeFinder,find() 方法内部判断请求的 HTTP 协议版本并返回连接。
连接池
连接池就是一个保存连接的容器,提供了 get() 和 put() 方法取出连接和保存连接,可以在创建 OkHttpClient 时自定义连接池:
val cp = ConnectionPool(5, 5, TimeUnit.MINUTES) OkHttpClient.Builder().connectionPool(cp)
ConnectionPool 构造方法中传入的三个参数分别为:
- maxIdleConnections 连接池最大允许的空闲连接数
- keepAliveDuration 连接最大允许的空闲时间
- timeUnit 时间单位
为什么要设置最大允许空闲连接数和最大允许空闲时间呢,这是为了实现连接池中的重要机制:cleanUp(),下面我们来了解一下这个机制:
fun cleanup(now: Long): Long { var inUseConnectionCount = 0 var idleConnectionCount = 0 var longestIdleConnection: RealConnection? = null var longestIdleDurationNs = Long.MIN_VALUE // Find either a connection to evict, or the time that the next eviction is due. for (connection in connections) { synchronized(connection) { // If the connection is in use, keep searching. if (pruneAndGetAllocationCount(connection, now) > 0) { inUseConnectionCount++ } else { idleConnectionCount++ // If the connection is ready to be evicted, we're done. val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection } else { Unit } } } } when { longestIdleDurationNs >= this.keepAliveDurationNs || idleConnectionCount > this.maxIdleConnections -> { // We've chosen a connection to evict. Confirm it's still okay to be evict, then close it. val connection = longestIdleConnection!! synchronized(connection) { if (connection.calls.isNotEmpty()) return 0L // No longer idle. if (connection.idleAtNs + longestIdleDurationNs != now) return 0L // No longer oldest. connection.noNewExchanges = true connections.remove(longestIdleConnection) } connection.socket().closeQuietly() if (connections.isEmpty()) cleanupQueue.cancelAll() // Clean up again immediately. return 0L } idleConnectionCount > 0 -> { // A connection will be ready to evict soon. return keepAliveDurationNs - longestIdleDurationNs } inUseConnectionCount > 0 -> { // All connections are in use. It'll be at least the keep alive duration 'til we run // again. return keepAliveDurationNs } else -> { // No connections, idle or in use. return -1 } } }
private val cleanupTask = object : Task("$okHttpName ConnectionPool") { override fun runOnce() = cleanup(System.nanoTime()) }
fun put(connection: RealConnection) { connection.assertThreadHoldsLock() connections.add(connection) cleanupQueue.schedule(cleanupTask) }
从这三段代码中可以看到连接池内部有一个 cleanupTask 变量来保存定时清理任务,而每次调用连接池的 put() 方法时,都会启动定时清理任务,内部的清理逻辑就是根据 ConnectionPool 的构造函数中传入的三个参数决定的。
5.5、请求服务拦截器 (CallServerInterceptor)
这个拦截器作用主要是向服务器发起请求并生成 Response 对象给客户端。intercept() 方法主要源码:
override fun intercept(chain: Interceptor.Chain): Response { ...... ...... try { // 1) exchange.writeRequestHeaders(request) // 2) if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) { ...... ...... } else { exchange.noRequestBody() } if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } } catch (e: IOException) { if (e is ConnectionShutdownException) { throw e // No request was sent so there's no response to read. } if (!exchange.hasFailure) { throw e // Don't attempt to read the response; we failed to send the request. } sendRequestException = e } // 3) try { if (responseBuilder == null) { responseBuilder = exchange.readResponseHeaders(expectContinue = false)!! if (invokeStartEvent) { exchange.responseHeadersStart() invokeStartEvent = false } } var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(sentRequestMillis) .receivedResponseAtMillis(System.currentTimeMillis()) .build() var code = response.code ...... ...... } catch (e: IOException) { if (sendRequestException != null) { sendRequestException.addSuppressed(e) throw sendRequestException } throw e } }
主要可以分为以下三个部分:
1)发送请求头
exchange 就是 ConnectInterceptor 中获取的连接对象,用这个对象调用 exchange.writeRequestHeaders(request) 方法向服务器写入请求头。
2)判断是否需要发送请求体,GET 请求和 HEAD 请求直接跳过这步
发送完请求头之后,如果是 POST 请求或其他需要请求体的请求,就会发送请求体,其中 if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) 就是来进行这步判断。
3)读取结果并生成 Response 对象
之后就根据响应头和响应体以及 HTTP 协议版本生成对应的 Response 对象并返回给用户使用。
6、自定义拦截器
自定义拦截器就是我们在创建 OkHttpClient 时可以自定义添加的拦截器,可以用来打印日志或者查看网络状态等。
OkHttpClient.Builder() .addInterceptor {} .addNetworkInterceptor {}
值得注意的是,在自定义拦截器的内部一定要执行 chain.proceed(Request) 方法,因为我们前面提到了,拦截器是责任链设计模式,一层一层传递,如果不调用 proceed() 方法,则请求链会断开,请求也就不能正确执行。
我们知道不论同步还是异步请求,都会调用到 getResponseWithInterceptorChain() 方法获取拦截器链完成请求:
internal fun getResponseWithInterceptorChain(): Response { // Build a full stack of interceptors. val interceptors = mutableListOf() interceptors += client.interceptors // 自定义应用拦截器 interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors // 自定义网络拦截器 } interceptors += CallServerInterceptor(forWebSocket) ...... ...... }
根据此方法中拦截器的添加顺序,我们添加的自定义拦截器有以下特点:
- 应用拦截器是第一个得到请求,最后一个得到响应,因此我们可以在这里对 request 进行操作。
- 网络拦截器是倒数第二个得到请求,第二个得到响应,在这个拦截器中得到的是最后真正发给服务器的 request,得到响应后可以对 response 进行操作。
👉 以上就是对 OkHttp 的全方位解析,文中如有表述不当,欢迎指正。
还没有评论,来说两句吧...