Отмена корутин и тайм-ауты

Отмена позволяет остановить корутину до её завершения. Она прекращает работу, которая больше не нужна: например, когда пользователь закрывает окно или переходит на другой экран пользовательского интерфейса, пока корутина всё ещё выполняется. Отмену также можно использовать, чтобы раньше освобождать ресурсы и не позволять корутине обращаться к объектам после их освобождения.

Отмену можно использовать, чтобы останавливать долгоживущие корутины, которые продолжают производить значения даже после того, как другим корутинам эти значения больше не нужны, например в конвейерах.

Отмена работает через дескриптор Job, который представляет жизненный цикл корутины и её отношения с родительскими и дочерними корутинами. Job позволяет проверять, активна ли корутина, и отменять её вместе с дочерними корутинами в соответствии со структурированным параллелизмом.

Отмена корутин

Корутина отменяется, когда для её дескриптора Job вызывается функция cancel(). Функции-конструкторы корутин, такие как .launch(), возвращают Job. Функция .async() возвращает Deferred, который реализует Job и поддерживает такое же поведение при отмене.

Функцию cancel() можно вызвать вручную. Она также может быть вызвана автоматически через распространение отмены, когда отменяется родительская корутина.

Когда корутина отменяется, при следующей проверке отмены она выбрасывает CancellationException. Подробнее о том, как и когда это происходит, читайте в разделе Точки приостановки и отмена.

Функцию awaitCancellation() можно использовать, чтобы приостановить корутину до её отмены.

Вот пример ручной отмены корутин:

import kotlinx.coroutines.*
import kotlin.time.Duration

//sampleStart
suspend fun main() {
    withContext(Dispatchers.Default) {
        // Used as a signal that the coroutine has started running
        val job1Started = CompletableDeferred<Unit>()

        val job1: Job = launch {

            println("The coroutine has started")

            // Completes the CompletableDeferred,
            // signaling that the coroutine has started running
            job1Started.complete(Unit)
            try {
                // Suspends indefinitely
                // Without cancellation, this call would never return
                delay(Duration.INFINITE)
            } catch (e: CancellationException) {
                println("The coroutine was canceled: $e")

                // Always rethrow cancellation exceptions!
                throw e
            }
            println("This line will never be executed")
        }

        // Waits for job1 to start before canceling it
        job1Started.await()

        // Cancels the coroutine, so delay() throws a CancellationException
        job1.cancel()

        // async returns a Deferred handle, which inherits from Job
        val job2 = async {
            // If the coroutine is canceled before its body starts executing,
            // this line may not be printed
            println("The second coroutine has started")

            try {
                // Equivalent to delay(Duration.INFINITE)
                // Suspends until this coroutine is canceled
                awaitCancellation()

            } catch (e: CancellationException) {
                println("The second coroutine was canceled")
                throw e
            }
        }
        job2.cancel()
    }
    // Coroutine builders such as withContext() or coroutineScope()
    // wait for all child coroutines to complete,
    // even when the children are canceled
    println("All coroutines have completed")
}
//sampleEnd

В этом примере CompletableDeferred используется как сигнал о том, что корутина начала выполняться. Корутина вызывает complete() при старте, а await() возвращает результат только после завершения этого CompletableDeferred. Благодаря этому отмена происходит только после того, как корутина начала выполняться. У корутины, созданной через .async(), такой проверки нет, поэтому её можно отменить до того, как код внутри её блока начнёт выполняться.

Внимание. Перехват CancellationException может нарушить распространение отмены. Если вам необходимо его перехватить, выбросьте его повторно, чтобы отмена корректно распространилась по иерархии корутин.

Подробнее см. в разделе Обработка исключений в корутинах.

Распространение отмены

Структурированный параллелизм гарантирует, что отмена корутины также отменяет все её дочерние корутины. Это не позволяет дочерним корутинам продолжать работу после остановки родительской корутины.

Например:

import kotlinx.coroutines.*
import kotlin.time.Duration

//sampleStart
suspend fun main() {
    withContext(Dispatchers.Default) {
        // Used as a signal that the child coroutines have been launched
        val childrenLaunched = CompletableDeferred<Unit>()

        // Launches two child coroutines
        val parentJob = launch {
            launch {
                println("Child coroutine 1 has started running")
                try {
                    awaitCancellation()
                } finally {
                    println("Child coroutine 1 has been canceled")
                }
            }
            launch {
                println("Child coroutine 2 has started running")
                try {
                    awaitCancellation()
                } finally {
                    println("Child coroutine 2 has been canceled")
                }
            }
            // Completes the CompletableDeferred,
            // signaling that the child coroutines have been launched
            childrenLaunched.complete(Unit)
        }
        // Waits for the parent coroutine to signal that it has launched
        // all of its children
        childrenLaunched.await()

        // Cancels the parent coroutine, which cancels all its children
        parentJob.cancel()
    }
}
//sampleEnd

В этом примере каждая дочерняя корутина использует блок finally, поэтому код внутри него выполняется при отмене корутины. Здесь CompletableDeferred сигнализирует, что дочерние корутины запущены до отмены, но не гарантирует, что они успели начать выполняться. Если их отменить первыми, на экран ничего не будет выведено.

Как заставить корутины реагировать на отмену

В Kotlin отмена корутин кооперативна. Это значит, что корутины реагируют на отмену только тогда, когда взаимодействуют с механизмом отмены: приостанавливаются или явно проверяют отмену.

В этом разделе показано, как создавать отменяемые корутины.

Точки приостановки и отмена

Когда корутина отменяется, она продолжает выполняться, пока не достигнет места в коде, где может приостановиться. Такое место называется точкой приостановки. Если корутина приостанавливается в этой точке, suspend-функция проверяет, была ли корутина отменена. Если была, корутина останавливается и выбрасывает CancellationException.

Вызов suspend-функции является точкой приостановки, но он не всегда действительно приостанавливает выполнение. Например, при ожидании результата Deferred корутина приостанавливается только в том случае, если этот Deferred ещё не завершён.

Ниже приведён пример с обычными suspend-функциями, которые приостанавливают выполнение, позволяя корутине проверить отмену и остановиться:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.channels.Channel
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration

suspend fun main() {
    withContext(Dispatchers.Default) {
        val childJobs = listOf(
            launch {
                // Suspends until canceled
                awaitCancellation()
            },
            launch {
                // Suspends until canceled
                delay(Duration.INFINITE)
            },
            launch {
                val channel = Channel<Int>()
                // Suspends while waiting for a value that is never sent
                channel.receive()
            },
            launch {
                val deferred = CompletableDeferred<Int>()
                // Suspends while waiting for a value that is never completed
                deferred.await()
            },
            launch {
                val mutex = Mutex(locked = true)
                // Suspends while waiting for a mutex that remains locked indefinitely
                mutex.lock()
            }
        )

        // Gives the child coroutines time to start and suspend
        delay(100.milliseconds)

        // Cancels all child coroutines
        childJobs.forEach { it.cancel() }
    }
    println("All child jobs completed!")
}

Все suspend-функции из библиотеки kotlinx.coroutines взаимодействуют с отменой, потому что внутри используют suspendCancellableCoroutine(), которая проверяет отмену при приостановке корутины. В отличие от них, пользовательские suspend-функции, использующие suspendCoroutine(), не реагируют на отмену.

Явная проверка отмены

Если корутина долго не приостанавливается, она не остановится при отмене, пока явно не проверит, была ли отменена.

Для проверки отмены используйте следующие API:

  • Свойство isActive равно false, когда корутина отменена.
  • Функция ensureActive() немедленно выбрасывает CancellationException, если корутина отменена.
  • Функция yield() приостанавливает корутину, освобождая поток и давая другим корутинам шанс выполниться на нём. Приостановка позволяет корутине проверить отмену и выбросить CancellationException, если корутина отменена.

Эти API полезны, когда корутины долго работают между точками приостановки или, вероятно, не будут приостанавливаться в этих точках.

isActive

Используйте свойство isActive в долгих вычислениях, чтобы периодически проверять отмену. Это свойство равно false, когда корутина больше не активна, и его можно использовать, чтобы аккуратно остановить корутину, когда ей больше не нужно продолжать операцию.

Например:

import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds
import kotlin.random.Random

//sampleStart
suspend fun main() {
    withContext(Dispatchers.Default) {
        val unsortedList = MutableList(10) { Random.nextInt() }

        // Starts a long-running computation
        val listSortingJob = launch {
            var i = 0

            // Repeatedly sorts the list while the coroutine remains active
            while (isActive) {
                unsortedList.sort()
                ++i
            }
            println(
                "Stopped sorting the list after $i iterations"
            )
        }
        // Sorts the list for 100 milliseconds, then considers it sorted enough
        delay(100.milliseconds)

        // Cancels the sorting when the result is good enough
        listSortingJob.cancel()

        // Waits until the sorting coroutine finishes
        // before accessing the shared list to avoid data races
        listSortingJob.join()
        println("The list is probably sorted: $unsortedList")
    }
}
//sampleEnd

В этом примере функция join() приостанавливает корутину до завершения другой корутины. Это гарантирует, что к списку не будет доступа, пока корутина сортировки ещё выполняется.

Функцию cancelAndJoin() можно использовать, чтобы отменить корутину и дождаться её завершения одним вызовом.

ensureActive()

Используйте функцию ensureActive(), чтобы проверить отмену и остановить текущее вычисление, выбросив CancellationException, если корутина отменена:

import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds

suspend fun main() {
    withContext(Dispatchers.Default) {
        val childJob = launch {
            var start = 0
            try {
                while (true) {
                    ++start
                    // Checks the Collatz conjecture for the current number
                    var n = start
                    while (n != 1) {
                        // Throws CancellationException if the coroutine is canceled
                        ensureActive()
                        n = if (n % 2 == 0) n / 2 else 3 * n + 1
                    }
                }
            } finally {
                println("Checked the Collatz conjecture for 0..${start-1}")
            }
        }
        // Runs the computation for one second
        delay(100.milliseconds)

        // Cancels the coroutine
        childJob.cancel()
    }
}

yield()

Функция yield() приостанавливает корутину и проверяет отмену перед возобновлением. Без приостановки корутины в одном потоке выполняются последовательно.

Используйте yield, чтобы дать другим корутинам выполниться в том же потоке или пуле потоков до того, как одна из них завершится:

import kotlinx.coroutines.*

//sampleStart
fun main() {
    // runBlocking uses the current thread for running all coroutines
    runBlocking {
        val coroutineCount = 5
        repeat(coroutineCount) { coroutineIndex ->
            launch {
                val id = coroutineIndex + 1
                repeat(5) { iterationIndex ->
                    val iteration = iterationIndex + 1
                    // Temporarily suspends to give other coroutines a chance to run
                    // Without this, the coroutines run sequentially
                    yield()
                    // Prints the coroutine index and iteration index
                    println("$id * $iteration = ${id * iteration}")
                }
            }
        }
    }
}
//sampleEnd

В этом примере каждая корутина использует yield(), чтобы дать другим корутинам выполниться между итерациями.

Прерывание блокирующего кода при отмене корутин

На JVM некоторые функции, например Thread.sleep() или BlockingQueue.take(), могут блокировать текущий поток. Такие блокирующие функции можно прервать, из-за чего они завершатся раньше. Однако если вызвать их из корутины, отмена не прервёт поток.

Чтобы при отмене корутины прервать поток, используйте функцию runInterruptible():

import kotlinx.coroutines.*

//sampleStart
suspend fun main() {
    withContext(Dispatchers.Default) {
        val childStarted = CompletableDeferred<Unit>()
        val childJob = launch {
            try {
                // Cancellation triggers a thread interruption
                runInterruptible {
                    childStarted.complete(Unit)
                    try {
                        // Blocks the current thread for a very long time
                        Thread.sleep(Long.MAX_VALUE)
                    } catch (e: InterruptedException) {
                        println("Thread interrupted (Java): $e")
                        throw e
                    }
                }
            } catch (e: CancellationException) {
                println("Coroutine canceled (Kotlin): $e")
                throw e
            }
        }
        childStarted.await()

        // Cancels the coroutine and interrupts the thread
        // by running Thread.sleep()
        childJob.cancel()
    }
}
//sampleEnd

Безопасная обработка значений при отмене корутин

Когда приостановленная корутина отменяется, она возобновляется с CancellationException, а не возвращает значения, даже если эти значения уже доступны. Такое поведение называется немедленной отменой (prompt cancellation). Оно не даёт коду продолжить выполнение в области видимости отменённой корутины, например обновить экран, который уже закрыт.

Например:

import java.nio.file.*
import java.nio.charset.*
import kotlinx.coroutines.*
import java.io.*

// Defines a coroutine scope that uses the UI thread
class ScreenWithFileContents(private val scope: CoroutineScope) {
    fun displayFile(path: Path) {
        scope.launch {
            val contents = withContext(Dispatchers.IO) {
                Files.newBufferedReader(
                    path, Charset.forName("US-ASCII")
                ).use {
                    it.readLines()
                }
            }
            // It's safe to call updateUi here,
            // In case of cancellation, withContext() wouldn't return any values
            updateUi(contents)
        }
    }

    // Throws an exception if called after the user left the screen
    private fun updateUi(contents: List<String>) {
      contents.forEach { line -> addOneLineToUi(line) }
    }

    private fun addOneLineToUi(line: String) {
        // Placeholder for code that adds one line to the UI
    }

    // Only callable from the UI thread
    fun leaveScreen() {
        // Cancels the scope when leaving the screen
        // You can no longer update the UI
        scope.cancel()
    }
}

В этом примере withContext(Dispatchers.IO) взаимодействует с отменой и не даёт updateUi() выполниться, если функция leaveScreen() отменит корутину до того, как та вернёт содержимое файла.

Хотя немедленная отмена предотвращает использование значений после того, как они стали недействительными, она также может остановить код, пока важное значение всё ещё используется, что может привести к потере этого значения. Такое возможно, когда корутина получает значение, например ресурс AutoCloseable, но отменяется до того, как доходит до участка кода, который его закрывает. Чтобы этого избежать, размещайте логику очистки там, где она гарантированно выполнится, даже если корутина, получившая значение, будет отменена.

Например:

import java.nio.file.*
import java.nio.charset.*
import kotlinx.coroutines.*
import java.io.*

// scope is a coroutine scope using the UI thread
class ScreenWithFileContents(private val scope: CoroutineScope) {
    fun displayFile(path: Path) {
        scope.launch {
            // Stores the reader in a variable, so the finally block can close it
            var reader: BufferedReader? = null

            try {
                withContext(Dispatchers.IO) {
                    reader = Files.newBufferedReader(
                        path, Charset.forName("US-ASCII")
                    )
                }
                // Uses the stored reader after withContext() completes
                updateUi(reader!!)
            } finally {
                // Ensures the reader is closed even when the coroutine is canceled
                reader?.close()
            }
        }
    }

    private suspend fun updateUi(reader: BufferedReader) {
        // Shows the file contents
        while (true) {
            val line = withContext(Dispatchers.IO) {
                reader.readLine()
            }
            if (line == null)
                break
            addOneLineToUi(line)
        }
    }

    private fun addOneLineToUi(line: String) {
        // Placeholder for code that adds one line to the UI
    }

    // Only callable from the UI thread
    fun leaveScreen() {
        // Cancels the scope when leaving the screen
        // You can no longer update the UI
        scope.cancel()
    }
}

В этом примере сохранение BufferedReader в переменной и его закрытие в блоке finally гарантируют освобождение ресурса, даже если корутина будет отменена.

Запуск неотменяемых блоков

Можно запретить отмене влиять на отдельные части корутины. Для этого передайте NonCancellable как аргумент функции-конструктору корутины withContext().

Внимание. Не используйте NonCancellable с другими конструкторами корутин, такими как .launch() или .async(). Это нарушает структурированный параллелизм, разрывая отношение между родительской и дочерней корутинами.

NonCancellable полезен, когда нужно гарантировать, что некоторые операции, например закрытие ресурсов при помощи suspend-функции close(), завершатся даже в случае отмены корутины до окончания этих операций.

Например:

import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds

//sampleStart
val serviceStarted = CompletableDeferred<Unit>()

fun startService() {
    println("Starting the service...")
    serviceStarted.complete(Unit)
}

suspend fun shutdownServiceAndWait() {
    println("Shutting down...")
    delay(100.milliseconds)
    println("Successfully shut down!")
}

suspend fun main() {
    withContext(Dispatchers.Default) {
        val childJob = launch {
            startService()
            try {
                awaitCancellation()
            } finally {
                withContext(NonCancellable) {
                    // Without withContext(NonCancellable),
                    // This function doesn't complete because the coroutine is canceled
                    shutdownServiceAndWait()
                }
            }
        }
        serviceStarted.await()
        childJob.cancel()
    }
    println("Exiting the program")
}
//sampleEnd

Тайм-аут

Тайм-ауты позволяют автоматически отменять корутину после заданной продолжительности. Они полезны для остановки операций, которые выполняются слишком долго, помогают сохранять отзывчивость приложения и избегать лишней блокировки потоков.

Чтобы задать тайм-аут, используйте функцию withTimeoutOrNull() с Duration:

import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.milliseconds

//sampleStart
suspend fun slowOperation(): Int {
    try {
        delay(300.milliseconds)
        return 5
    } catch (e: CancellationException) {
        println("The slow operation has been canceled: $e")
        throw e
    }
}

suspend fun fastOperation(): Int {
    try {
        delay(15.milliseconds)
        return 14
    } catch (e: CancellationException) {
        println("The fast operation has been canceled: $e")
        throw e
    }
}

suspend fun main() {
    withContext(Dispatchers.Default) {
        val slow = withTimeoutOrNull(100.milliseconds) {
            slowOperation()
        }
        println("The slow operation finished with $slow")
        val fast = withTimeoutOrNull(100.milliseconds) {
            fastOperation()
        }
        println("The fast operation finished with $fast")
    }
}
//sampleEnd

Если выполнение превышает указанную Duration, withTimeoutOrNull() возвращает null.