kotlin协程——>基础、取消与超时
Kotlin使用掛起函數(shù)為異步操作,使用kotlinx.coroutines中的launch、async
1. 第?個協(xié)程程序
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后臺啟動?個新的協(xié)程并繼續(xù)
delay(1000L) // ?阻塞的等待 1 秒鐘(默認(rèn)時間單位是毫秒)
println("World!") // 在延遲后打印輸出
}
println("Hello,") // 協(xié)程已在等待時主線程還在繼續(xù)
Thread.sleep(2000L) // 阻塞主線程 2 秒鐘來保證 JVM 存活
}
代碼運行的結(jié)果
Hello, World!
本質(zhì)上,協(xié)程是輕量級的線程。它們在某些 CoroutineScope 上下?中與 launch 協(xié)程構(gòu)建器 ?起啟 動。這?我們在 GlobalScope 中啟動了?個新的協(xié)程,這意味著新協(xié)程的?命周期只受整個應(yīng)?程序 的?命周期限制。 可以將 GlobalScope.launch { …… } 替換為 thread { …… } ,并將 delay(……) 替換為 Thread.sleep(……) 達到同樣?的。試試看(不要忘記導(dǎo)? kotlin.concurrent.thread )。 — — — — — — — — — 協(xié)程基礎(chǔ) 第?個協(xié)程程序 205 如果你?先將 GlobalScope.launch 替換為 thread ,編譯器會報以下錯誤:
Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function
這是因為 delay 是?個特殊的 掛起函數(shù) ,它不會造成線程阻塞,但是會 掛起 協(xié)程,并且只能在協(xié)程中 使?。
2. 橋接阻塞與?阻塞的世界
第?個?例在同?段代碼中混?了 ?阻塞的 delay(……) 與 阻塞的 Thread.sleep(……) 。這容易 讓我們記混哪個是阻塞的、哪個是?阻塞的。讓我們顯式使? runBlocking 協(xié)程構(gòu)建器來阻塞:
import kotlinx.coroutines.*
fun main() {
GlobalScope.launch { // 在后臺啟動?個新的協(xié)程并繼續(xù)
delay(1000L)
println("World!")
}
println("Hello,") // 主線程中的代碼會?即執(zhí)?
runBlocking { // 但是這個表達式阻塞了主線程
delay(2000L) // ……我們延遲 2 秒來保證 JVM 的存活
}
}
結(jié)果是相似的,但是這些代碼只使?了?阻塞的函數(shù) delay。調(diào)?了 runBlocking 的主線程會?直 阻塞 直到 runBlocking 內(nèi)部的協(xié)程執(zhí)?完畢。
這個?例可以使?更合乎慣?法的?式重寫,使? runBlocking 來包裝 main 函數(shù)的執(zhí)?:
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> { // 開始執(zhí)?主協(xié)程
GlobalScope.launch { // 在后臺啟動?個新的協(xié)程并繼續(xù)
delay(1000L)
println("World!")
}
println("Hello,") // 主協(xié)程在這?會?即執(zhí)?
delay(2000L) // 延遲 2 秒來保證 JVM 存活
}
這?的 runBlocking { …… } 作為?來啟動頂層主協(xié)程的適配器。我們顯式指定了其返回 類型 Unit,因為在 Kotlin 中 main 函數(shù)必須返回 Unit 類型。
這也是為掛起函數(shù)編寫單元測試的?種?式:
class MyTest {
@Test
fun testMySuspendingFunction() = runBlocking<Unit> {
// 這?我們可以使?任何喜歡的斷??格來使?掛起函數(shù)
}
}
延遲?段時間來等待另?個協(xié)程運?并不是?個好的選擇。讓我們顯式(以?阻塞?式)等待所啟動的 后臺 Job 執(zhí)?結(jié)束:
val job = GlobalScope.launch { // 啟動?個新協(xié)程并保持對這個作業(yè)的引?
delay(1000L)
println("World!")
}
println("Hello,")
job.join() // 等待直到?協(xié)程執(zhí)?結(jié)束
現(xiàn)在,結(jié)果仍然相同,但是主協(xié)程與后臺作業(yè)的持續(xù)時間沒有任何關(guān)系了。好多了。
3. 結(jié)構(gòu)化的并發(fā)
協(xié)程的實際使?還有?些需要改進的地?。當(dāng)我們使? GlobalScope.launch 時,我們會創(chuàng)建?個 頂層協(xié)程。雖然它很輕量,但它運?時仍會消耗?些內(nèi)存資源。如果我們忘記保持對新啟動的協(xié)程的引 ?,它還會繼續(xù)運?。如果協(xié)程中的代碼掛起了會怎么樣(例如,我們錯誤地延遲了太?時間),如果我們 啟動了太多的協(xié)程并導(dǎo)致內(nèi)存不?會怎么樣?必須?動保持對所有已啟動協(xié)程的引?并 join 之很容易 出錯。 有?個更好的解決辦法。我們可以在代碼中使?結(jié)構(gòu)化并發(fā)。我們可以在執(zhí)?操作所在的指定作?域內(nèi) 啟動協(xié)程,?不是像通常使?線程(線程總是全局的)那樣在 GlobalScope 中啟動。 在我們的?例中,我們使? runBlocking 協(xié)程構(gòu)建器將 main 函數(shù)轉(zhuǎn)換為協(xié)程。包括 runBlocking 在內(nèi)的每個協(xié)程構(gòu)建器都將 CoroutineScope 的實例添加到其代碼塊所在的作?域中。我們可以在這 個作?域中啟動協(xié)程??需顯式 join 之,因為外部協(xié)程(?例中的 runBlocking )直到在其作?域 中啟動的所有協(xié)程都執(zhí)?完畢后才會結(jié)束。因此,可以將我們的?例簡化為:
import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch { // 在 runBlocking 作?域中啟動?個新協(xié)程
delay(1000L)
println("World!")
}
println("Hello,")
}
4.作?域構(gòu)建器
除了由不同的構(gòu)建器提供協(xié)程作?域之外,還可以使? coroutineScope 構(gòu)建器聲明??的作?域。它 會創(chuàng)建?個協(xié)程作?域并且在所有已啟動?協(xié)程執(zhí)?完畢之前不會結(jié)束。 runBlocking 與 coroutineScope 可能看起來很類似,因為它們都會等待其協(xié)程體以及所有?協(xié)程結(jié) 束。主要區(qū)別在于,runBlocking ?法會阻塞當(dāng)前線程來等待,? coroutineScope 只是掛起,會釋放底 層線程?于其他?途。由于存在這點差異,runBlocking 是常規(guī)函數(shù),? coroutineScope 是掛起函數(shù)。 可以通過以下?例來演?:
import kotlinx.coroutines.*
fun main() = runBlocking { // this: CoroutineScope
launch {
delay(200L)
println("Task from runBlocking")
}
coroutineScope { // 創(chuàng)建?個協(xié)程作?域
launch {
delay(500L)
println("Task from nested launch")
}
delay(100L)
println("Task from coroutine scope") // 這??會在內(nèi)嵌 launch 之前輸出
}
println("Coroutine scope is over") // 這??在內(nèi)嵌 launch 執(zhí)?完畢后才輸出
}
請注意,(當(dāng)?shù)却齼?nèi)嵌 launch 時)緊挨“Task from coroutine scope”消息之后,就會執(zhí)?并輸出“Task from runBlocking”?盡管 coroutineScope 尚未結(jié)束。
5. 提取函數(shù)重構(gòu)
我們來將 launch { …… } 內(nèi)部的代碼塊提取到獨?的函數(shù)中。當(dāng)你對這段代碼執(zhí)?“提取函數(shù)”重構(gòu) 時,你會得到?個帶有 suspend 修飾符的新函數(shù)。這是你的第?個掛起函數(shù)。在協(xié)程內(nèi)部可以像普通 函數(shù)?樣使?掛起函數(shù),不過其額外特性是,同樣可以使?其他掛起函數(shù)(如本例中的 delay )來掛 起協(xié)程的執(zhí)?。
import kotlinx.coroutines.*
fun main() = runBlocking {
launch { doWorld() }
println("Hello,")
}
// 這是你的第?個掛起函數(shù)
suspend fun doWorld() {
delay(1000L)
println("World!")
}
但是如果提取出的函數(shù)包含?個在當(dāng)前作?域中調(diào)?的協(xié)程構(gòu)建器的話,該怎么辦?在這種情況下,所 提取函數(shù)上只有 suspend 修飾符是不夠的。為 CoroutineScope 寫?個 doWorld 擴展?法是其 中?種解決?案,但這可能并?總是適?,因為它并沒有使 API 更加清晰。慣?的解決?案是要么顯式 將 CoroutineScope 作為包含該函數(shù)的類的?個字段,要么當(dāng)外部類實現(xiàn)了 CoroutineScope 時 隱式取得。作為最后的?段,可以使? CoroutineScope(coroutineContext),不過這種?法結(jié)構(gòu)上不安 全,因為你不能再控制該?法執(zhí)?的作?域。只有私有 API 才能使?這個構(gòu)建器。
6.全局協(xié)程像守護線程
以下代碼在 GlobalScope 中啟動了?個?期運?的協(xié)程,該協(xié)程每秒輸出“I'm sleeping”兩次,之后在 主函數(shù)中延遲?段時間后返回。
GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 在延遲后退出
你可以運?這個程序并看到它輸出了以下三?后終?:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ...
在 GlobalScope 中啟動的活動協(xié)程并不會使進程保活。它們就像守護線程
7.取消協(xié)程的執(zhí)行
在?個?時間運?的應(yīng)?程序中,你也許需要對你的后臺協(xié)程進?細(xì)粒度的控制。?如說,?個??也 許關(guān)閉了?個啟動了協(xié)程的界?,那么現(xiàn)在協(xié)程的執(zhí)?結(jié)果已經(jīng)不再被需要了,這時,它應(yīng)該是可以被 取消的。該 launch 函數(shù)返回了?個可以被?來取消運?中的協(xié)程的 Job:
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延遲?段時間
println("main: I'm tired of waiting!")
job.cancel() // 取消該作業(yè)
job.join() // 等待作業(yè)執(zhí)?結(jié)束
println("main: Now I can quit.")
程序執(zhí)?后的輸出如下:
job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.
?旦 main 函數(shù)調(diào)?了 job.cancel ,我們在其它的協(xié)程中就看不到任何輸出,因為它被取消了。這? 也有?個可以使 Job 掛起的函數(shù) cancelAndJoin 它合并了對 cancel 以及 join 的調(diào)?。
8.取消是協(xié)作的
協(xié)程的取消是 協(xié)作 的。?段協(xié)程代碼必須協(xié)作才能被取消。所有 kotlinx.coroutines 中的掛起 函數(shù)都是 可被取消的 。它們檢查協(xié)程的取消,并在取消時拋出 CancellationException。然?,如果協(xié) 程正在執(zhí)?計算任務(wù),并且沒有檢查取消的話,那么它是不能被取消的,就如如下?例代碼所?:
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // ?個執(zhí)?計算的循環(huán),只是為了占? CPU
// 每秒打印消息兩次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待?段時間
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消?個作業(yè)并且等待它結(jié)束
println("main: Now I can quit.")
運??例代碼,并且我們可以看到它連續(xù)打印出了“I'm sleeping”,甚?在調(diào)?取消后,作業(yè)仍然執(zhí)?了 五次循環(huán)迭代并運?到了它結(jié)束為?。
9.使計算代碼可取消
我們有兩種?法來使執(zhí)?計算的代碼可以被取消。第?種?法是定期調(diào)?掛起函數(shù)來檢查取消。對于這 種?的 yield 是?個好的選擇。另?種?法是顯式的檢查取消狀態(tài)。讓我們試試第?種?法。 將前?個?例中的 while (i < 5) 替換為 while (isActive) 并重新運?它。
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // 可以被取消的計算循環(huán)
// 每秒打印消息兩次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待?段時間
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消該作業(yè)并等待它結(jié)束
println("main: Now I can quit.")
你可以看到,現(xiàn)在循環(huán)被取消了。isActive 是?個可以被使?在 CoroutineScope 中的擴展屬性。
10.在 finally 中釋放資源
我們通常使?如下的?法處理在被取消時拋出 CancellationException 的可被取消的掛起函數(shù)。?如 說,try {……} finally {……} 表達式以及 Kotlin 的 use 函數(shù)?般在協(xié)程被取消的時候執(zhí)?它們 的終結(jié)動作:
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // 延遲?段時間
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消該作業(yè)并且等待它結(jié)束
println("main: Now I can quit.")
join 和 cancelAndJoin 等待了所有的終結(jié)動作執(zhí)?完畢,所以運??例得到了下?的輸出:
job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! job: I'm running finally main: Now I can quit.
11. 運?不能取消的代碼塊
在前?個例?中任何嘗試在 finally 塊中調(diào)?掛起函數(shù)的?為都會拋出 CancellationException,因 為這?持續(xù)運?的代碼是可以被取消的。通常,這并不是?個問題,所有良好的關(guān)閉操作(關(guān)閉?個? 件、取消?個作業(yè)、或是關(guān)閉任何?種通信通道)通常都是?阻塞的,并且不會調(diào)?任何掛起函數(shù)。然?, 在真實的案例中,當(dāng)你需要掛起?個被取消的協(xié)程,你可以將相應(yīng)的代碼包裝在 withContext(NonCancellable) {……} 中,并使? withContext 函數(shù)以及 NonCancellable 上 下?,?如下?例所?:
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延遲?段時間
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消該作業(yè)并等待它結(jié)束
println("main: Now I can quit.")
12. 超時
在實踐中絕?多數(shù)取消?個協(xié)程的理由是它有可能超時。當(dāng)你?動追蹤?個相關(guān) Job 的引?并啟動了 ?個單獨的協(xié)程在延遲后取消追蹤,這?已經(jīng)準(zhǔn)備好使? withTimeout 函數(shù)來做這件事。來看看?例代碼:
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
運?后得到如下輸出:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
withTimeout 拋出了 TimeoutCancellationException ,它是 CancellationException 的?類。 我們之前沒有在控制臺上看到堆棧跟蹤信息的打印。這是因為在被取消的協(xié)程中 CancellationException 被認(rèn)為是協(xié)程執(zhí)?結(jié)束的正常原因。然?,在這個?例中我們在 main 函數(shù)中正確地使?了 withTimeout
由于取消只是?個例外,所有的資源都使?常?的?法來關(guān)閉。如果你需要做?些各類使?超時的特別 的額外操作,可以使?類似 withTimeout 的 withTimeoutOrNull 函數(shù),并把這些會超時的代碼包裝在 try {...} catch (e: TimeoutCancellationException) {...} 代碼塊中,? withTimeoutOrNull 通過返回 null 來進?超時操作,從?替代拋出?個異常:
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // 在它運?得到結(jié)果之前取消它
}
println("Result is $result")
運?這段代碼時不再拋出異常:
I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Result is null
總結(jié)
以上是生活随笔為你收集整理的kotlin协程——>基础、取消与超时的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如虎添翼!(如虎添翼的意思!)
- 下一篇: Mysql系列(十二)—— 索引下推优化