Основы сопрограмм

В этом разделе рассматриваются основные концепции сопрограмм.

Ваша первая сопрограмма

Запустите следующий код:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // запуск новой сопрограммы в фоне
        delay(1000L) // неблокирующая задержка на 1 секунду
        println("World!") // вывод результата после задержки
    }
    println("Hello,") // пока сопрограмма проводит вычисления, основной поток продолжает свою работу
    Thread.sleep(2000L) // блокировка основного потока на 2 секунды, чтобы сопрограмма успела произвести вычисления
}

Полный код можно посмотреть здесь.

Результат выполнения кода будет следующим:

Hello,
World!

По сути, сопрограммы - это легковесные потоки. Они запускаются с помощью билдера сопрограмм launch в контексте некоторого CoroutineScope. В примере выше мы запускаем новую сопрограмму в GlobalScope. Это означает, что время жизни новой сопрограммы ограничено только временем жизни всего приложения.

Вы можете получить тот же результат, заменив GlobalScope.launch {...} на thread {...} или на delay(...) с Thread.sleep (...). Попробуйте (не забудьте импортировать kotlin.concurrent.thread).

Если вы начнете с замены GlobalScope.launch на thread, то компилятор выдаст следующую ошибку:

Error: Kotlin: Suspend functions are only allowed to be called from a coroutine or another suspend function

Это связано с тем, что delay является функцией приостановки, которая не блокирует поток, а приостанавливает сопрограмму. Использовать её можно только из сопрограммы.

Связываем блокирующий и неблокирующий миры

В первом примере смешаны две функции: неблокирующая delay(...) и блокирующая Thread.sleep(...). Легко забыть, какая из них блокирует основной поток, а какая нет. Давайте подробно рассмотрим блокировку с помощью билдера runBlocking:

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // запуск новой сопрограммы в фоне
        delay(1000L)
        println("World!")
    }
    println("Hello,") // основной поток продолжает свою работу
    runBlocking {     // но это выражение блокирует основной поток
        delay(2000L)  // на 2 секунды
    }
}

Полный код можно посмотреть здесь

Результат выполнения кода будет тот же, но здесь используется только неблокирующая delay. Основной поток, вызывающий runBlocking, блокируется до завершения сопрограммы внутри runBlocking.

Этот пример можно переписать более идиоматическим способом, используя runBlocking, чтобы обернуть выполнение функции main:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> { // запуск основной сопрограммы
    GlobalScope.launch { // запуск новой сопрограммы в фоне
        delay(1000L)
        println("World!")
    }
    println("Hello,") // основная сопрограмма продолжает свою работу
    delay(2000L)      // задержка на 2 секунды
}

Полный код можно посмотреть здесь

Здесь runBlocking<Unit> { ... } работает как адаптер, который используется для запуска основной сопрограммы верхнего уровня. Мы явно указываем возвращаемый тип Unit, потому что правильно сформированная функция main в Kotlin должна возвращать Unit.

Таким же образом можно писать модульные тесты для функций приостановки:

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // здесь мы можем использовать функции приостановки
    }
}

Job

Использование задержки для того, чтобы сопрограмма успела выполнить свою работу - не лучший подход. Давайте явно подождем (неблокирующим способом), пока Job, запущенный в фоне, не будет выполнен.

import kotlinx.coroutines.*

fun main() = runBlocking {
    val job = GlobalScope.launch { // запуск новой сопрограммы с сохранением ссылки на нее в Job
        delay(1000L)
        println("World!")
    }
    println("Hello,")
    job.join() // ждем завершения вложенной сопрограммы  
}

Полный код можно посмотреть здесь.

Результат прежний, но код основной сопрограммы никак не привязан к длительности фонового задания - job.

Структурированный параллелизм

Есть еще пара вещей, которые мы бы хотели получить перед тем, как использовать сопрограммы на практике. Когда мы используем GlobalScope.launch, мы создаем сопрограмму верхнего уровня. Несмотря на то, что сопрограмма легковесна, она всё равно потребляет некоторые ресурсы во время своей работы. Если мы забудем сохранить ссылку на только что запущенную сопрограмму, она все равно продолжит работать. Что делать, если код в сопрограмме зависает (например, мы по ошибке указали слишком большую задержку), что если мы запустили слишком много сопрограмм и исчерпали память? Необходимость вручную сохранять ссылки на все запущенные сопрограммы и присоединяться к ним чревата ошибками.

Есть более хорошее решение. В нашем коде можно использовать структурированный параллелизм. Вместо запуска сопрограмм в GlobalScope, как мы обычно делаем с потоками (потоки всегда глобальные), мы можем запускать сопрограммы в области видимости выполняемой нами операции.

В нашем примере есть функция main, которая превращается в сопрограмму с помощью билдера runBlocking. Каждый билдер, включая runBlocking, добавляет экземпляр CoroutineScope к области видимости своего блока с кодом. Мы можем запускать сопрограммы в этой области видимости без явного использования join, потому что внешняя сопрограмма (runBlocking в нашем примере) не завершится, пока не будут выполнены все сопрограммы, запущенные в ее области видимости. Таким образом, мы можем упростить наш пример:

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
    launch { // запуск сопрограммы в области видимости runBlocking
        delay(1000L)
        println("World!")
    }
    println("Hello,")
}

Полный код можно посмотреть здесь.

Scope builder

В дополнение к CoroutineScope, предоставляемой разными билдерами, можно объявить свою собственную область видимости с помощью билдера coroutineScope. Он создает область видимости и не завершается, пока не завершатся все запущенные дочерние сопрограммы.

runBlocking и coroutineScope могут выглядеть одинаково, потому что они обе ждут завершения всех операций внутри своего блока и завершения всех запущенных дочерних сопрограмм. Основное отличие заключается в том, что метод runBlocking блокирует текущий поток, в то время как coroutineScope лишь приостанавливает работу, высвобождая основной поток для других целей. Из-за этой разницы runBlocking является обычной функцией, а coroutineScope - функцией приостановки.

Следующий пример это демонстрирует:

import kotlinx.coroutines.*

fun main() = runBlocking { // this: CoroutineScope
    launch {
        delay(200L)
        println("Task from runBlocking")
    }

    coroutineScope { // Создание coroutine scope
        launch {
            delay(500L)
            println("Task from nested launch")
        }

        delay(100L)
        println("Task from coroutine scope") // Эта строка будет выведена перед вложенным launch
    }

    println("Coroutine scope is over") // Эта строка не будет выведена пока не выполнится вложенный launch
}

Полный код можно посмотреть здесь.

Обратите внимание, что сразу после сообщения "Task from coroutine scope" (во время ожидания выполнения вложенного launch) выполняется и выдаётся "Task from runBlocking", хотя выполнение coroutineScope еще не завершилось.

Извлечение функции

Извлечём блок кода внутри launch { ... } в отдельную функцию. Когда вы выполните рефакторинг кода внутри launch { ... } с помощью пункта "Extract function", то вы получите новую функцию с модификатором suspend. Это ваша первая функция приостановки. Функции приостановки могут использоваться внутри сопрограмм, как обычные функции, но их дополнительная особенность заключается в том, что они, в свою очередь, могут использовать другие функции приостановки (например, функцию delay) для приостановки выполнения сопрограммы.

import kotlinx.coroutines.*

fun main() = runBlocking {
    launch { doWorld() }
    println("Hello,")
}

// это ваша первая функция приостановки
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

Полный код можно посмотреть здесь.

Но что, если извлечённая функция содержит билдер сопрограммы, который вызывается в текущей области видимости? В этом случае модификатора suspend для извлечённой функции недостаточно. Сделать doWorld методом расширения на CoroutineScope - одно из решений, но оно не всегда может быть применимо, поскольку не делает API более понятным. Идиоматическое решение состоит в том, чтобы иметь либо явный CoroutineScope в качестве поля в классе, содержащем целевую функцию, либо неявный, когда внешний класс реализует CoroutineScope. В крайнем случае можно использовать CoroutineScope(coroutineContext), но такой подход структурно небезопасен, потому что вы теряете контроль над выполняемой областью видимости этого метода. Только приватные API могут использовать этот конструктор.

Легковесные сопрограммы

Запустите следующий код:

import kotlinx.coroutines.*

fun main() = runBlocking {
    repeat(100_000) { // запуск большого количества сопрограмм
        launch {
            delay(5000L)
            print(".")
        }
    }
}

Полный код можно посмотреть здесь.

Данный код запускает 100 тысяч сопрограмм, каждая из которых через 5 секунд печатает точку.

А теперь попробуйте сделать то же самое с потоками. Что произойдёт? (Скорее всего это вызовет ошибку, связанную с нехваткой памяти).

Глобальные сопрограммы похожи на демон-потоки

Нижеприведённый код запускает длительную сопрограмму в GlobalScope, которая два раза в секунду выводит сообщение "I'm sleeping", а затем, после некоторой задержки, происходит возврат из функции main:

import kotlinx.coroutines.*

fun main() = runBlocking {
    GlobalScope.launch {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // выход после некоторой задержки
}

Полный код можно посмотреть здесь.

Если вы запустите данный код, то увидите, что он трижды выводит сообщение и завершается:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

Активные сопрограммы, запущенные в GlobalScope, не поддерживают "жизнь" процесса. В этом они похожи на демон-потоки.