Контекст корутины и диспетчеры

Корутины всегда выполняются в некотором контексте, представленном значением типа CoroutineContext, определенным в стандартной библиотеке Kotlin.

Контекст корутины представляет собой набор различных элементов. Основными элементами являются Job корутины, которую мы видели ранее, и ее диспетчер, который рассматривается в этом разделе.

Диспетчеры и потоки

Контекст корутины включает диспетчер корутины (см. CoroutineDispatcher), который определяет, какой поток или потоки использует соответствующая корутина для своего выполнения. Диспетчер корутины может ограничить выполнение корутины определенным потоком, отправить ее в пул потоков или позволить ей выполняться без ограничений.

Все билдеры корутин, такие как launch и async, принимают необязательный параметр CoroutineContext, который можно использовать для явного указания диспетчера для новой корутины и других элементов контекста.

Попробуйте следующий пример:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch { // онтекст родителя, основная корутина runBlocking
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // не ограничено -- будет работать с основным потоком
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // будет отправлено в DefaultDispatcher 
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // получит свой собственный новый поток
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

Полный код находится здесь.

Он выдает следующий вывод (возможно, в другом порядке):

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

Когда launch { ... } используется без параметров, он наследует контекст (и, следовательно, диспетчер) от CoroutineScope, из которого он запускается. В этом случае он наследует контекст основной корутины runBlocking, которая выполняется в main потоке.

Dispatchers.Unconfined — это специальный диспетчер, который также работает в main потоке, но на самом деле это другой механизм, который будет объяснен позже.

Диспетчер по умолчанию используется, когда в области действия явно не указан другой диспетчер. Он представлен Dispatchers.Default и использует общий фоновый пул потоков.

[newSingleThreadContext] создает поток для запуска корутины. Выделенный поток — очень дорогой ресурс. В реальном приложении он должен быть либо освобожден, когда он больше не нужен, с помощью функции close, либо сохранен в переменной верхнего уровня и повторно использован во всем приложении.

Неограниченный и ограниченный диспетчер

Диспетчер корутин Dispatchers.Unconfined запускает корутину в вызывающем потоке, но только до первой точки приостановки. После приостановки он возобновляет корутину в потоке, который полностью определяется вызванной функцией приостановки. Неограниченный диспетчер подходит для корутин, которые не потребляют процессорное время и не обновляют какие-либо общие данные (например, пользовательский интерфейс), ограниченные определенным потоком.

С другой стороны, диспетчер по умолчанию наследуется от внешнего CoroutineScope. Диспетчер по умолчанию для корутины runBlocking, в частности, ограничен вызывающим потоком, поэтому его наследование приводит к ограничению выполнения этим потоком с предсказуемым планированием FIFO.

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Unconfined) { // не ограничено -- будет работать с основным потоком
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // контекст родителя, основная корутина runBlocking
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
}

Полный код находится здесь.

Он выдает следующий вывод:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

Таким образом, корутина с контекстом, унаследованным от runBlocking {...}, продолжает выполняться в main потоке, а неограниченная возобновляется в потоке исполнителя по умолчанию, который использует функция delay.

Неограниченный диспетчер — это расширенный механизм, который может быть полезен в некоторых крайних случаях, когда диспетчеризация корутины для ее последующего выполнения не требуется или приводит к нежелательным побочным эффектам, поскольку некоторые операции в корутине должны выполняться сразу. Неограниченный диспетчер не должен использоваться в общем коде.

Отладка корутин и потоков

Корутины могут приостанавливаться в одном потоке и возобновляться в другом потоке. Даже с однопоточным диспетчером может быть сложно понять, что делала корутина, где и когда, если у вас нет специального инструментария.

Отладка с помощью IDEA

Плагин отдалки корутин для Kotlin упрощает отладку корутин в IntelliJ IDEA.

Отладка работает для версий 1.3.8 и выше kotlinx-coroutines-core.

Окно инструмента Debug содержит вкладку Coroutines. На этой вкладке вы можете найти информацию как о запущенных, так и о приостановленных корутинах. Корутины группируются по диспетчеру, на котором они выполняются.

Debugging coroutines

С отладчиком корутин вы можете:

  • Проверить состояние каждой корутины.
  • Смотреть значения локальных и захваченных переменных как для запущенных, так и для приостановленных корутин.
  • Смотреть полный стек создания корутин, а также стек вызовов внутри корутин. В стек входят все кадры с переменными значениями, даже те, которые были бы потеряны при стандартной отладке.
  • Получить полный отчет, содержащий состояние каждой корутины и ее стека. Чтобы получить его, щелкните правой кнопкой мыши по вкладке Coroutines и выберите Get Coroutines Dump.

Чтобы начать отладку корутины, вам просто нужно установить точки останова и запустить приложение в режиме отладки.

Узнайте больше об отладке корутин в руководстве.

Отладка с использованием логирования

Другой подход к отладке приложений с потоками без плагина отладки заключается в печати имени потока в файле журнала для каждого оператора журнала. Эта функция повсеместно поддерживается фреймворками ведения журналов. При использовании корутин само по себе имя потока не дает много контекста, поэтому kotlinx.coroutines включает средства отладки, чтобы упростить его.

Запустите следующий код с параметром JVM -Dkotlinx.coroutines.debug:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking<Unit> {

    val a = async {
        log("I'm computing a piece of the answer")
        6
    }
    val b = async {
        log("I'm computing another piece of the answer")
        7
    }
    log("The answer is ${a.await() * b.await()}")
 
}

Полный код находится здесь.

Есть три корутины. Основная корутины (№1) внутри runBlocking и две корутины, вычисляющие отложенные значения a (№2) и b (№3). Все они выполняются в контексте runBlocking и ограничены основным потоком. Вывод этого кода:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

Функция log печатает имя потока в квадратных скобках, и вы можете видеть, что это основной поток с добавленным к нему идентификатором выполняемой в данный момент корутины. Этот идентификатор последовательно присваивается всем созданным корутинам при включенном режиме отладки.

Режим отладки также включается, когда JVM запускается с параметром -ea. Подробнее о средствах отладки можно прочитать в документации по свойству DEBUG_PROPERTY_NAME.

Перескакивание между потоками

Запустите следующий код с параметром JVM -Dkotlinx.coroutines.debug (см. отладку):

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1")
                withContext(ctx2) {
                    log("Working in ctx2")
                }
                log("Back to ctx1")
            }
        }
    }
}

Полный код находится здесь.

Он демонстрирует несколько новых методов. Один использует runBlocking с явно указанным контекстом, а другой использует функцию withContext для изменения контекста корутины, оставаясь при этом в той же самой корутине, как вы можете видеть в выводе ниже:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

Обратите внимание, что в этом примере также используется функция use из стандартной библиотеки Kotlin для освобождения потоков, созданных с помощью newSingleThreadContext, когда они больше не нужны.

Job в контексте

Job корутины является частью ее контекста и может быть извлечено из него с помощью выражения coroutineContext[Job]:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}

Полный код находится здесь.

В режиме отладки выдает что-то вроде этого:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

Обратите внимание, что isActive в CoroutineScope — это просто удобный ярлык для coroutineContext[Job]?.isActive == true.

Дети корутины

Когда корутина запускается в CoroutineScope другой корутины, она наследует свой контекст через CoroutineScope.coroutineContext, и Job новой корутины становится дочерним по отношению к Job родительской корутины. Когда родительская корутина отменяется, все ее потомки также рекурсивно отменяются.

Однако это отношение родитель-потомок может быть явно переопределено одним из двух способов:

  1. Если при запуске корутины явно указана другая область видимости (например, GlobalScope.launch), то она не наследует Job от родительской области видимости видимости.
  2. Когда в качестве контекста для новой корутины передается другой объект Job (как показано в примере ниже), он переопределяет Job родительской области видимости.

В обоих случаях запущенная корутина не привязана к области видимости, из которой она была запущена, и работает независимо.

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs
        launch(Job()) { 
            println("job1: I run in my own Job and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // cancel processing of the request
    delay(1000) // delay a second to see what happens
    println("main: Who has survived request cancellation?")
}

Полный код находится здесь.

Вывод этого кода:

job1: I run in my own Job and execute independently!
job2: I am a child of the request coroutine
job1: I am not affected by cancellation of the request
main: Who has survived request cancellation?

Родительские обязанности

Родительская корутина всегда ожидает завершения всех своих потомков. Родителю не нужно явно отслеживать всех дочерних элементов, которые он запускает, и ему не нужно использовать Job.join, чтобы дождаться их в конце:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        repeat(3) { i -> // launch a few children jobs
            launch  {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // wait for completion of the request, including all its children
    println("Now processing of the request is complete")
}

Полный код находится здесь.

Результат будет:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

Именование корутин для отладки

Автоматически назначаемые идентификаторы хороши, когда корутины часто логируются, и вам просто нужно сопоставить записи логов, поступающие от одной и той же корутины. Однако, когда корутина связана с обработкой определенного запроса или выполнением какой-либо конкретной фоновой задачи, лучше назвать ее явно для отладки. Элемент контекста CoroutineName служит той же цели, что и имя потока. Он включен в имя потока, выполняющего эту корутину, когда включен режим отладки.

Следующий пример демонстрирует эту концепцию:

import kotlinx.coroutines.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun main() = runBlocking(CoroutineName("main")) {
    log("Started main coroutine")
    // run two background value computations
    val v1 = async(CoroutineName("v1coroutine")) {
        delay(500)
        log("Computing v1")
        252
    }
    val v2 = async(CoroutineName("v2coroutine")) {
        delay(1000)
        log("Computing v2")
        6
    }
    log("The answer for v1 / v2 = ${v1.await() / v2.await()}")
}

Полный код находится здесь.

Результат, с опцией JVM -Dkotlinx.coroutines.debug, будет:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

Объединение элементов контекста

Иногда нам нужно определить несколько элементов для контекста корутины. Для этого мы можем использовать оператор +. Например, мы можем запустить корутину с явно указанным диспетчером и с явно указанным именем одновременно:

import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
}

Полный код находится здесь.

Результат, с опцией JVM -Dkotlinx.coroutines.debug, будет:

I'm working in thread DefaultDispatcher-worker-1 @test#2

Область видимости корутины

Давайте объединим наши знания о контекстах, детях и jobs вместе. Предположим, что в нашем приложении есть объект с жизненным циклом, но этот объект не является корутиной. Например, мы пишем приложение для Android и запускаем различные корутины в контексте Android активити для выполнения асинхронных операций по выборке и обновлению данных, анимации и т. д. Все эти корутины должны быть отменены при уничтожении активити, чтобы избежать утечки памяти. Мы, конечно, можем манипулировать контекстами и jobs вручную, чтобы связать жизненные циклы activity и ее корутин, но kotlinx.coroutines предоставляет абстракцию, инкапсулирующую это: CoroutineScope. Вы уже должны быть знакомы с областью действия корутины, так как все билдеры корутин объявляются в ней как расширения.

Мы управляем жизненными циклами наших корутин, создавая экземпляр CoroutineScope, привязанный к жизненному циклу нашей активити. Экземпляр CoroutineScope может быть создан фабричными функциями CoroutineScope() или MainScope(). Первая создает область общего назначения, а вторая создает область для приложений пользовательского интерфейса и использует Dispatchers.Main в качестве диспетчера по умолчанию:

class Activity {
    private val mainScope = MainScope()
    
    fun destroy() {
        mainScope.cancel()
    }
    // to be continued ...

Теперь мы можем запускать корутину в рамках этого Activity, используя заданную scope. Для демонстрации мы запускаем десять корутин, которые задерживают на разное время:

    // class Activity continues
    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

В нашей основной функции мы создаем активити, вызываем нашу тестовую функцию doSomething и уничтожаем активити через 500 мс. Это отменяет все корутины, которые были запущены из doSomething. Мы видим это, потому что после уничтожения активити больше не печатаются сообщения, даже если мы подождем немного дольше.

import kotlinx.coroutines.*

class Activity {
    private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes
    
    fun destroy() {
        mainScope.cancel()
    }

    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

fun main() = runBlocking<Unit> {
    val activity = Activity()
    activity.doSomething() // run test function
    println("Launched coroutines")
    delay(500L) // delay for half a second
    println("Destroying activity!")
    activity.destroy() // cancels all coroutines
    delay(1000) // visually confirm that they don't work
}

Полный код находится здесь.

Результат этого примера:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

Как видите, только первые две корутины печатают сообщение, а остальные отменяются одним вызовом job.cancel() в Activity.destroy().

Обратите внимание, что Android имеет собственную поддержку области действия корутины во всех сущностях с жизненным циклом. Смотрите соответствующую документацию.

Локальных данные потока

Иногда удобно иметь возможность передавать некоторые локальные данные потока в корутины или между ними. Однако, поскольку они не привязаны ни к какому конкретному потоку, это, скорее всего, приведет к шаблону, если сделать это вручную.

Для ThreadLocal на помощь приходит функция расширения asContextElement. Она создает дополнительный элемент контекста, который сохраняет значение данного ThreadLocal и восстанавливает его каждый раз, когда корутина переключает свой контекст.

Это легко продемонстрировать в действии:

import kotlinx.coroutines.*

val threadLocal = ThreadLocal<String?>() // declare thread-local variable

fun main() = runBlocking<Unit> {
    threadLocal.set("main")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

Полный код находится здесь.

В этом примере мы запускаем новую корутину в пуле фоновых потоков, используя Dispatchers.Default, поэтому она работает в другом потоке из пула потоков, но по-прежнему имеет значение локальной переменной потока, которое мы указали с помощью threadLocal.asContextElement(value = "launch"), независимо от того, в каком потоке выполняется корутина. Таким образом, вывод (с отладкой):

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

Легко забыть установить соответствующий элемент контекста. Локальная переменная потока, доступ к которой осуществляется из корутины, может иметь неожиданное значение, если поток, выполняющий корутину, отличается. Чтобы избежать таких ситуаций, рекомендуется использовать метод ensurePresent и отказоустойчивость при неправильном использовании.

ThreadLocal имеет первоклассную поддержку и может использоваться с любыми примитивами, предоставляемыми kotlinx.coroutines. Однако у него есть одно ключевое ограничение: когда локальный поток изменяется, новое значение не распространяется на вызывающую корутину (поскольку элемент контекста не может отслеживать все обращения к объекту ThreadLocal), а обновленное значение теряется при следующей приостановке. Используйте withContext для обновления локальной переменной потока в корутине, дополнительные сведения смотрите в разделе asContextElement.

В качестве альтернативы значение может храниться в классе-обертке, таком как class Counter(var i: Int), который, в свою очередь, хранится в локальной переменной потока. Однако в этом случае вы несете полную ответственность за синхронизацию потенциально одновременных изменений переменной в этом классе-обертке.

Для расширенного использования, например для интеграции с логированием MDC, контекстами транзакций или любыми другими библиотеками, которые внутренне используют локальные потоки для передачи данных, смотрите документацию по интерфейсу ThreadContextElement, который необходимо реализовать.