Составление функций приостановки

В этом разделе рассматриваются различные подходы к композиции функций приостановки.

Последовательно по умолчанию

Предположим, что у нас есть две функции приостановки, определенные в другом месте, которые делают что-то полезное, например, какой-то удаленный вызов службы или вычисление. Мы просто притворяемся, что они полезны, но на самом деле каждый из них просто задерживает на секунду для целей этого примера:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Что мы делаем, если нам нужно, чтобы они вызывались последовательно — первый doSomethingUsefulOne, затем doSomethingUsefulTwo, и вычислить сумму их результатов? На практике мы делаем это, если используем результат первой функции, чтобы принять решение о том, нужно ли нам вызывать вторую, или решить, как ее вызывать.

Мы используем обычный последовательный вызов, потому что код в корутине, как и в обычном коде, по умолчанию последовательный. Следующий пример демонстрирует это, измеряя общее время, необходимое для выполнения обеих функций приостановки:

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Полный код находится здесь.

Получается что-то вроде этого:

The answer is 42
Completed in 2017 ms

Параллельное использование async

Что, если между вызовами doSomethingUsefulOne и doSomethingUsefulTwo нет зависимостей, и мы хотим получить ответ быстрее, выполняя оба одновременно? Здесь на помощь приходит async.

Концептуально async аналогична launch. Она запускает отдельную корутину, представляющую собой облегченный поток, который работает одновременно со всеми другими корутинами. Разница в том, что launch возвращает Job и не несет никакого результирующего значения, тогда как async возвращает Deferred — облегченное неблокирующее будущее значение, которое представляет собой обещание предоставить результат позже. Вы можете использовать .await() для отложенного значения, чтобы получить его окончательный результат, но Deferred также является Job, поэтому при необходимости вы можете отменить его.

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Полный код находится здесь.

Получается что-то вроде этого:

The answer is 42
Completed in 1017 ms

Это в два раза быстрее, потому что две корутины выполняются одновременно. Обратите внимание, что параллелизм с корутинами всегда явный.

Ленивый старт async

При желании async можно сделать ленивой, установив для параметра start значение CoroutineStart.LAZY. В этом режиме он запускает корутину только тогда, когда ее результат требуется для await или если вызывается функция start его Job. Запустите следующий пример:

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // some computation
        one.start() // start the first one
        two.start() // start the second one
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Полный код находится здесь.

Получается что-то вроде этого:

The answer is 42
Completed in 1017 ms

Итак, здесь две корутины определены, но не выполняются, как в предыдущем примере, а программист получает контроль над тем, когда именно начинать выполнение, вызывая start. Сначала мы запускаем one, затем two, а затем ждем завершения отдельных корутин.

Обратите внимание, что если мы просто вызовем await в println без предварительного вызова start для отдельных корутин, это приведет к последовательному поведению, поскольку await запускает выполнение корутины и ожидает ее завершения, что не является предполагаемым "ленивым" вариантом использования. Вариант использования для async(start = CoroutineStart.LAZY) — это замена стандартной функции lazy в случаях, когда вычисление значения включает приостановку функций.

Функции в асинхронном стиле

Мы можем определить функции в асинхронном стиле, которые вызывают doSomethingUsefulOne и doSomethingUsefulTwo асинхронно, используя построитель async корутины с использованием ссылки на GlobalScope для отказа от структурированного параллелизма. Мы называем такие функции суффиксом «...Async», чтобы подчеркнуть тот факт, что они только запускают асинхронные вычисления, и для получения результата необходимо использовать полученное отложенное значение.

GlobalScope — это деликатный API, который может иметь нетривиальные последствия, один из которых будет объяснен ниже, поэтому вы должны явным образом согласиться на использование GlobalScope с помощью @OptIn(DelicateCoroutinesApi::class).

// The result type of somethingUsefulOneAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

Обратите внимание, что эти функции xxxAsync не являются функциями приостановки. Их можно использовать откуда угодно. Однако их использование всегда подразумевает асинхронное (здесь это означает параллельное) выполнение их действия с вызывающим кодом.

В следующем примере показано их использование вне корутины:

import kotlinx.coroutines.*
import kotlin.system.*

// обратите внимание, что в этом примере у нас нет `runBlocking` справа от `main`
fun main() {
    val time = measureTimeMillis {
        // мы можем инициировать асинхронные действия вне корутины
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // но ожидание результата должно включать либо приостановку, либо блокировку.
        // здесь мы используем `runBlocking { ... }`, чтобы заблокировать основной поток в ожидании результата
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Полный код находится здесь.

Этот стиль программирования с асинхронными функциями приведен здесь только для иллюстрации, так как он популярен в других языках программирования. Использование этого стиля с корутинами Kotlin настоятельно не рекомендуется по причинам, описанным ниже.

Рассмотрим, что произойдет, если между строкой val one = somethingUsefulOneAsync() и выражением one.await() в коде будет какая-то логическая ошибка, и программа выдаст исключение, а операция, которую выполняла программа, прервется. Обычно глобальный обработчик ошибок может перехватывать это исключение, регистрировать и сообщать об ошибке разработчикам, но в противном случае программа может продолжать выполнять другие операции. Однако здесь у нас somethingUsefulOneAsync все еще работает в фоновом режиме, даже несмотря на то, что операция, которая его инициировала, была прервана. Эта проблема не возникает при структурированном параллелизме, как показано в разделе ниже.

Структурированный параллелизм с async

Давайте возьмем пример Параллельное использование async и извлечем функцию, которая одновременно выполняет doSomethingUsefulOne и doSomethingUsefulTwo и возвращает сумму их результатов. Поскольку построитель async корутин определен как расширение CoroutineScope, нам нужно иметь его в области действия, и это то, что предоставляет функция coroutineScope:

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

Таким образом, если внутри кода функции concurrentSum что-то пойдет не так, и она выдаст исключение, все корутины, запущенные в ее области действия, будут отменены.

import kotlinx.coroutines.*
import kotlin.system.*

fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        println("The answer is ${concurrentSum()}")
    }
    println("Completed in $time ms")
}

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // притворимся, что мы делаем что-то полезное здесь
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // притворимся, что мы тоже делаем что-то полезное здесь
    return 29
}

Полный код находится здесь.

У нас все еще есть параллельное выполнение обеих операций, как видно из вывода вышеприведенной функции main:

The answer is 42
Completed in 1017 ms

Отмена всегда распространяется через иерархию корутин:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE) // Эмулирует очень долгие вычисления
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> {
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

Полный код находится здесь.

Обратите внимание, как первый async и ожидающий родитель отменяются при сбое одного из дочерних элементов (а именно two):

Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException