Skip to content

Kotlin Coroutines

Michael Bely edited this page Apr 29, 2024 · 60 revisions

Important

ВНИМАНИЕ!
ЭТОТ РАЗДЕЛ БОЛЬШЕ НЕ ПОДДЕРЖИВАЕТСЯ!
РОАДМАП ПЕРЕЕХАЛ В NOTION

Kotlin coroutines on Android
Flows on Android
Use Kotlin coroutines with Architecture components
Best practices for coroutines in Android
Scope Functions
Improve app performance with Kotlin coroutines
Coroutines Features
Курс по Kotlin Coroutines

Coroutines
API для асинхронных операций, которые могут быть приостановлены
• созданы для асинхронных операций (потоки - для многозадачности)
• возможность писать асинхронный код в синхронном стиле
• легко переключаться между корутинами без системных вызовов и блокировок (потоки Java блокируют ресурс и никому не дают его использовать)
• когда корутина приостанавливается - она освобождает поток, когда она готова - найдет первый свободный поток и продолжит работу
• не требуют системных ресурсов для работы, например, запуска нового потока, умно используют заданные пулы потоков (можно запустить миллиард корутин, но нельзя запустить миллиард потоков)
• не требуют поддержки со стороны ОС
• не гарантируется выполнение на одном и том же потоке

suspend
Помечает функцию как приостанавливаемую (используется как корутина)
• возвращает свой ответ асинхронно
• можно вызвать из другой suspend-функции или из корутины, которая создается с помощью coroutine builder
• под капотом - state machine, цикл switch-case

runCatching
Вызывает указанный функциональный блок и возвращает его инкапсулированный результат, если вызов был успешным, перехватывая любое исключение Throwable, которое было сгенерировано при выполнении функции блока, и инкапсулируя его как сбой

runBlocking
Блокирует поток, с которого запускается корутина. Не является расширением к типу CoroutineScope, в отличие от других билдеров. Предназначен для соединения обычного блокирующего кода с библиотеками, написанными в стиле приостановки, для использования в main-функции и тестах
• работает как scope, но не является раcширением CoroutineScope

Backpressure
У Flow backpressure заложена в Kotlin suspending functions. Если сборщик flow не может принимать новые данные в настоящий момент, он приостанавливает источник. Возобновление происходит позднее, когда сборщик flow снова сможет получать данные. Таким образом, в Kotlin нет необходимости выбирать тип источника данных, в отличие от RxJava

launch
Запускает новую корутину и не возвращает результат вызывающей стороне. Любую работу, которая считается «выстрелил и забыл», можно запустить с помощью launch
• является расширением CoroutineScope, может быть запущена только внутри scope

async
Запускает новую корутину и позволяет вернуть результат с помощью функции приостановки await
• является расширением CoroutineScope, может быть запущена только внутри scope

val jobs: List<Deferred<Int>> = List(100) {
    async(start = CoroutineStart.LAZY) {
        doWork()
    }
}
jobs.forEach { println(it.await()) }

await
Ожидает результата из объекта Deferred

Structures cuncurrency
Механизм, предоставляющий иерархическую структуру для организации работы coroutine. Все принципы строятся на основе CoroutineScope и оношения родитель-ребенок у Job
• scope хранит все ссылки на coroutines, запущенные в нем
• отмена scope - отмена coroutines
• отмена родительской Job приведет к отмене всех дочерних Job
• отмена дочерней Job приведет к отмене родительской Job и отмене всех других дочерних Job

CoroutineName
Задать осмысленное имя для корутины. Полезно при отладке

launch(CoroutineName("LongTask"))

Context

CoroutineContext
Определяет поведение корутины. Является набором параметров для выполнения coroutines
• каждая корутина выполняется в каком-либо контексте
• явно не создается, задается в scope либо при запуске корутины (в launch)
• можно объединить несколько контекстов в один

Job
Объект корутины. Идентифицирует корутину. Компонент контекста
• выполняемая задача, launch возвращают экземпляр Job
• с помощью Job можно управлять работой корутины и ее lifecycle, Job можно отменить
• на основе Job можно организовать иерархию parent-child
• lifecycle Job состоит из 6 состояний: New (при создании), Active (после старта), Completing (завершается), Completed (завершена + дочерние), Cancelling (отмена/ошибка), Canceled (отменена)

SupervisorJob
Сбой или отмена дочернего job не приведёт к сбою parent job и не повлияет на другие компоненты и scope. Если использовать обычный Job, то отказ дочерней приведет к немедленному отказу родительской. Приложение упадет, даже если обернуть код в блок try-catch

withContext
Переносит выполнение текущей корутины на новый контекст, в большинстве случаев на новый диспетчер

Scope

CoroutineScope
Отслеживает любую корутину, которую создает, используя launch или async. Scope хранит все ссылки на корутины, запущенные в нем. Scope может отменить выполнение всех дочерних корутин, если возникнет ошибка или операция будет отменена
• отмена дочернего scope приведет к отмене родительского scope и наоборот
• scope будет завершен успешно, когда выполнятся все корутины в нем

supervisorScope
Аналог SupervisorJob для scope. Важно, чтобы он работал независимо от UI, где может быть родительский scope

supervisorScope {
    ...
}

GlobalScope
Специальный CoroutineScope, который не привязан к какай-либо Job. Все корутины, запущенные в рамках него будут работать до своей остановки или остановки процесса. Использование может легко привести к утечке памяти
• этот scope невозможно отменить
• использование нарушает принципы structured concurrency и может привести к утечке памяти

Dispatchers

Coroutine context and dispatchers

Dispatcher
Отвечает за то, на каком потоке будет выполняться корутина
• диспетчеры лучше инжектить, чтобы было удобно тестировать

newSingleThreadContext()
Вручную запускает поток с указанным именем. API деликатное, использовать осторожно

launch(newSingleThreadContext("Custom Thread")) {
    println("coroutine flow: ${Thread.currentThread().name}")"
}

newFixedThreadPoolContext()
Позволяет создать собственный отдельный пул потоков. Все корутины по умолчанию выполняются в CommonPool. Все ресурсы будут освобождены после того как программа отработает. API деликатное, использовать осторожно

val pool: ExecutorCoroutineDispatcher = newFixedThreadPoolContext(8, "myPool")

Dispatcher.Default
Диспетчер по умолчанию используется, когда явно не указан другой диспетчер. Он представлен использует общий фоновый пул потоков. Этот диспетчер оптимизирован для выполнения задач с интенсивным использованием ЦП вне основного потока. Примеры использования включают сортировку списка и синтаксический анализ JSON
• используется по умолчанию в launch и async
• если все 64 потока CommonPool заняты, может создать больше

Dispatcher.IO
Так же как и Default основан на пуле потоков. Оптимизирован для выполнения дискового или сетевого ввода-вывода вне основного потока. Примеры включают использование Room, чтение файлов или запись в них, а также выполнение любых сетевых операций
• максимальный размер CommonPool для использования (не создания) - 64 потока

Dispatcher.Main
Выполнит suspend функции и обычные функции на main потоке (основном потоке Android), отдавая предпочтение обычным функциям. Это следует использовать только для взаимодействия с пользовательским интерфейсом и выполнения быстрой работы. Пример: обновление объектов LiveData

Dispatcher.Main.immediate
Выполнит suspend функции и обычные функции на main потоке по порядку

Dispatchers.Unconfined
Корутина не закреплена четко за определенным потоком или пулом потоков. Она запускается в текущем потоке до первой приостановки. После возобновления работы корутина продолжает работу в одном из потоков, который сторого не фиксирован. Разработчики языка Kotlin в обычной ситуации не рекомендуют использовать данный тип

val dispatcher: CoroutineDispatcher = Dispatchers.IO.limitedParallelism(5)

Exceptions

CancellationException
Специальное исключение для обработки отмены выполнения корутин
• вызов cancel приведет к этому исключению

try {
    ...
} catch (e: CancellationException) {
    // обязательно пробрасываем дальше
    throw e
} catch (e: Exception) {
    // обрабатываем другие исключения
}

CoroutineExceptionHandler
Определить поведение для всех необработанных исключений, которые происходят в текущем контексте выполнения корутин. Общий catch-блок. Если исключение захвачено, то дальше к родителю оно пробрасываться не будет
• вызывается в последнюю очередь, когда произошла ошибка
• может быть вызван на любом потоке
• не будет ловить CancellationException, так как они не являются ошибками выполнения корутин

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
    ...
}
CoroutineScope(exceptionHandler)
scope.launch(exceptionHandler)

NonCancellable
Использовать для освобождения ресурсов при исключении
• только для использования в withContext()

val inputStream: InputStream
try {
    doSomethingLong(inputStream)
} catch (e: Exception) {
    // обрабатываем исключение
} finally {
    withContext(NonCancellable) {
        shutdown(inputStream)
    }
}
try {
    coroutineScope {
        ...
    }
} catch (e: Exception) {
    // обработать все исключения scope
}
/**
 * Не вызывается в случае CancelException и если исключение произошло до задания CompletionHandler
 */
job.invokeOnCompletion { cause: Throwable? ->
    if (cause != null) {
        // произошла ошибка
    } else {
        // корутина успешно выполнена
    }
}

Channels

Обеспечивают передачу потока значений между корутинами. Похожи на BlockingQueue, но без блокирующих операций put и take

Flow

Типы функций над потоками?
В зависимости от того, возвращают они конкретное значение или обработанный поток функции делятся на: терминальные и промежуточные. Терминальные функции потоков представляют suspend-функции, которые позволяют получать объекты из потока или возвращают какое-то конечное значение: collect, toList, toSet first, firstOrNull, last, lastOrNull, single, singleOrNull, count, reduce, fold. Промежуточные функции не являются suspend, принимают поток и возвращают обработанный поток: combine, drop, filter, filterNot, filterNotNull, map, onEach, take, transform, zip

Flow
Cold flow. Асинхронный поток данных, который последовательно эмитит данные и завершается успешно или с исключением. Похож на Observable в RxJava

SharedFlow
Hot flow. Генерирует события, даже если вы не вызываете collect() на нем
• нет возможность синхронно получить value
• всегда отправляет новые значения, даже если они одинаковые
• может иметь несколько подписчиков

MutableSharedFlow
Мутабельный SharedFlow

StateFlow
Hot flow. Это специализация SharedFlow. Доставляет последнее событие только новым подписчикам
• есть возможность синхронно получить value
• сравнивает предыдущее значение с новым, игнорирует если одинаковые

MutableStateFlow
Мутабельный StateFlow

emit
Метод передачи события, suspend function

tryEmit
Попытается передать событие без приостановки

asStateFlow
Конвертирует мутабельный flow в иммутабельный (только для чтения)

stateIn
Преобразует холодный поток в горячий. Возвращает оператор StateFlow. Полезен в ситуациях, когда имеется холодный поток, предоставляющий обновления значения некоторого состояния, создание и/или обслуживание которого требует больших затрат, но имеется несколько подписчиков, которым необходимо собирать самое последнее значение состояния

val stateFlow: StateFlow<Int> = flow
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5_000),
        initialValue = 0
    )

shareIn
Преобразует холодный поток в горячий. Возвращает оператор SharedFlow

val sharedFlow: SharedFlow<Int> = flow
    .shareIn(
        scope = viewModelScope,
        started = SharingStarted.Lazily,
        replay = 0
    )

callbackFlow

val locationsSource: Flow<Location> = callbackFlow<Location> {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult?) {
            result ?: return
            offer(result.lastLocation)
        }
    }
    requestLocationUpdates(createLocationRequest(), callback, Looper.getMainLooper())
        .addOnFailureListener { e ->
            close(e) // in case of exception, close the Flow
        }
    awaitClose { // clean up when Flow collection ends
        removeLocationUpdates(callback)
    }
}

flowOf
Cоздает поток из набора переданных в функцию значений

val numberFlow: Flow<Int> = flowOf(1, 2, 3, 5, 8)

asFlow
Стандартные коллекции и последовательности в Kotlin имеют метод расширения asFlow(), который позволяет преобразовать коллекцию или последовательность в поток

val numberFlow: Flow<Int> = (1..5).asFlow()
val nameFlow: Flow<String> = listOf("Tom", "Sam", "Bob").asFlow()

flowOn
Изменить CoroutineContext потока. flowOn изменяет CoroutineContext восходящего потока, то есть производитель и любые промежуточные операторы, применяемые до (или выше) flowOn. Если имеется несколько операторов flowOn, каждый из них изменяет восходящий поток относительно своего текущего местоположения.

flowOf("A", "B", "C")
    .catch { Timber.e(it) }
    .flowOn(Dispatchers.IO)
    .collect {}

collect
Получить все значения в потоке по мере их создания

flowOf("A", "B", "C")
    .collect {} // A, B, C

collectLatest
Оператор терминального потока, который собирает данный поток с заданным действием. Принципиальное отличие от collect заключается в том, что когда исходный поток выдает новое значение, блок действий для предыдущего значения отменяется

flow {
    emit(1)
    delay(50)
    emit(2)
}.collectLatest { value ->
    println("collect $value")
    delay(100)
    println("$value collected")
} // collect 1, collect 2, 2 collected

catch
Оператор обработки исключений

newsRepository.favoriteLatestNews
    .catch { exception -> notifyError(exception) }
    .collect { favoriteNews ->
        // Update View with the latest favorite news
    }

toList
Преобразует поток значений в коллекцию List

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val list: MutableList<Int> = mutableListOf()
numberFlow.toList()
numberFlow.toList(list)

toSet
Преобразует поток значений в коллекцию Set

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val set: LinkedHashSet<Int> = linkedSetOf()
numberFlow.toSet()
numberFlow.toSet(set)

toCollection
Collects given flow into a collection

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
val collection: MutableList<Int> = mutableListOf()
numberFlow.toCollection(collection)

first
Получает первый объект из потока

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.first() // Tom
nameFlow.first { name -> name.length > 3 } // Kate

firstOrNull
Получает первый объект из потока или null, если список пустой или условию не соответствует ни один из элементов потока

val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.firstOrNull()
nameFlow.firstOrNull { name -> name.length > 3 }

last
Получает последний объект из потока

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.last() // Alice

lastOrNull
Получает последний объект из потока или null

val nameFlow: Flow<String> = listOf<String>().asFlow()
nameFlow.lastOrNull() // null

single
Возвращает единственный элемент потока, если поток содержит только один элемент. Если поток не содержит элементов генерируется исключение NoSuchElementException, а если в потоке больше одного элемента - исключение IllegalStateException

val nameFlow: Flow<String> = listOf("Tom").asFlow()
nameFlow.single() // Tom

singleOrNull
Ожидает получение одного объекта из потока или null, если поток пуст или если в потоке больше одного элемента

val nameFlow: Flow<String> = listOf("Tom", "Bob").asFlow()
nameFlow.singleOrNull() // null

count
Получить количество элементов в потоке

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.count() // 5
nameFlow.count { name -> name.length > 3 } // 2

reduce
Получает результат определенной операции над элементами потока. Сводит все значения потока к одному значению. Первый параметр при первом запуске представляет первый объект потока, а при последующих запусках - результат функции над предыдущими объектами. А второй параметр функции - следующий объект

val numberFlow: Flow<Int> = listOf(1, 2, 3).asFlow()
numberFlow.reduce { accumulator, value -> accumulator + value } // 6

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.reduce { accumulator, value -> "$accumulator $value" } // Tom Bob Kate Sam Alice

fold
Получает результат определенной операции над элементами потока, в отличие от функции reduce() принимает начальное значение. Сводит все элементы потока в один.

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.fold("Names:") { accumulator, value -> "$accumulator $value" } // Names: Tom Bob Kate Sam Alice

combine
Объединяет два потока в один, после применения к их элементам функции преобразования. Последние элементы потоков всегда объединяются.

flowOf("A", "B")
    .combine(flowOf("1", "2", "3")) { a, b ->
        a + b
    } // A1, B1, B2, B3

combineTransform
Возвращает поток, значения которого генерируются функцией преобразования, обрабатывающей последние переданные значения каждым потоком

combineTransform(
    flowOf("A", "B", "C"), 
    flowOf("1", "2", "3")
) { letter, number ->
    val result = "$letter$number"
    emit(result)
} // A1, B1, B2, C2, C3

drop
Удаляет из потока определенное количество элементов. В качестве параметра принимает количество элементов с начала потока, которые надо убрать

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.drop(3) // Sam, Alice

dropWhile
Удаляет из потока элементы, пока они не начнут соответствовать некоторому условию

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Alice", 32),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 25)
).asFlow()
 
peopleFlow.dropWhile { person -> person.age > 17 } // Bill, Sam, Bob

filter
Фильтрует объекты в потоке

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()
 
peopleFlow.filter { person -> person.age > 17 } // Tom, Bob

filterNot
Фильтрует поток, оставляя те элементы, которые не соответствуют условию

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()
 
peopleFlow.filterNot { person -> person.age > 17 } // Bill, Sam

filterNotNull
Фильтрует поток, удаляя все элементы, которые равны null

val peopleFlow: Flow<Person?> = listOf(
    Person("Tom", 37),
    null
    Person("Sam", 14),
    null
).asFlow()
 
peopleFlow.filterNotNull() // Tom, Sam

filterIsInstance

val anyFlow: Flow<Any> = listOf(
    "Alice",
    1,
    "Bob",
    2
).asFlow()

anyFlow.filterIsInstance<Int>() // 1, 2

map
Преобразует данные потока. В качестве параметра он принимает функцию преобразования. Функция преобразования принимает в качестве единственного параметра объект из потока и возвращает преобразованные данные

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Sam", 41),
    Person("Bob", 21),
    Person("Bill", 5)
).asFlow()

peopleFlow.map { person -> person.name } // Tom, Sam, Bob, Bill

peopleFlow
    .map { person -> object {
        val name: String = person.name
        val isAdult: Boolean = person.age > 17
    } }

mapNotNull
Возвращает поток, содержащий только ненулевые результаты применения заданной функции преобразования к каждому значению исходного потока

val peopleFlow: Flow<Person?> = listOf(
    Person("Tom", 37),
    null,
    Person("Sam", 41),
    null,
    Person("Bob", 21),
    null,
    Person("Bill", 5)
).asFlow()

peopleFlow.mapNotNull { person -> person?.name } // Tom, Sam, Bob, Bill

onEach
Применяет к элементам потока определенную функцию перед тем, как они будут переданы в возвращаемый поток

val numberFlow: Flow<Int> = flowOf(1, 2, 3)
numberFlow.onEach { delay(1000L) }

take
Ограничивает количество элементов в потоке. В качестве параметра принимает количество элементов с начала потока, которые надо оставить

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Kate", "Sam", "Alice").asFlow()
nameFlow.take(3) // Tom, Bob, Kate

takeWhile
Выбирает из потока элементы, пока будет истино некоторое условие

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Alice", 32),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 25)
).asFlow()
 
peopleFlow.takeWhile { person -> person.age > 17 } // Tom, Alice

transform
Выполняем преобразование объектов в потоке. В отличие от map позволяет использовать функцию emit(), чтобы передавать в поток произвольные объекты

val peopleFlow: Flow<Person> = listOf(
    Person("Tom", 37),
    Person("Bill", 5),
    Person("Sam", 14),
    Person("Bob", 21)
).asFlow()

peopleFlow
    .transform { person ->
        if (person.age > 17) {
            emit(person.name)
        }
    } // Tom, Bob

val numberFlow: Flow<Int> = listOf(2, 3, 4).asFlow()
numberFlow
    .transform { number ->
        emit(number)
        emit(number * number)
    } // 2, 4, 3, 9, 4, 16

transformLatest
Возвращает поток, который создает элемент с помощью функции преобразования каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий блок преобразования отменяется

flow {
    emit("A")
    delay(100)
    emit("B")
}.transformLatest { value ->
    emit(value)
    delay(200)
    emit(value + "_last")
} // A, B, B_last

transformWhile
Применяет функцию преобразования к каждому значению данного потока, пока эта функция возвращает значение true

zip
Позволяет объединить 2 потока данных. Оператор zip принимает два параметра. Первый параметр - поток данных, с которым надо выполнить объединение. Второй параметр - собственно функция объединения. Она принимает соответствующие элементы обоих потоков в качестве параметров и возвращает результат их объединения

val nameFlow: Flow<String> = listOf("Tom", "Bob", "Sam").asFlow()
val ageFlow: Flow<Int> = listOf(37, 41, 25).asFlow()
nameFlow
    .zip(ageFlow) { name, age -> Person(name, age) }
    .collect { person -> println("Name: ${person.name}, Age: ${person.age}") }

debounce
Не позволяет выполнить операцию, пока не истечет установленный таймер. Удобно для полей ввода текста. После получения события запускается таймер. Если новое событие приходит, когда таймер активен - таймер перезапускается. Если таймер истек, мы испускаем последний испущенный элемент

flow {
    emit(1)
    delay(90)
    emit(2)
    delay(90)
    emit(3)
    delay(1010)
    emit(4)
    delay(1010)
    emit(5)
}.debounce(1000) // 3, 4, 5

sample
Подобно debounce используется для фильтрации элементов потока, но имеет отличие - вместо проверка интервала от последнего элемента запускается периодически и отправляет последний элемент за интервал. Например, поток выдает элементы каждые 50 мс, и если применить выборку в 100 мс, получим каждый второй элемент. С оператором debounce получили бы только элемент 9.

flow {
    repeat(10) {
        emit(it)
        delay(50)
    }
}.sample(100) // 1, 3, 5, 7, 9

flatMapMerge
Сопоставляет каждый элемент потока с новым потоком, представленным операцией преобразования, а затем объединяет элементы этих потоков и сглаживает его

flowOf(1, 2, 3)
    .flatMapMerge { number: Int ->
        flowOf("$number A", "$number B")
    } // 1A, 1B, 2A, 2B, 3A, 3B

flatMapConcat
Подобен flatMapMerge, но здесь потоки объединяются, а не сливаются. Важно: flatmapMerge имеет лучшую производительность, поскольку может обрабатывать потоки параллельно

flowOf(1, 2, 3)
    .flatMapConcat { number: Int ->
        flowOf("$number A", "$number B")
    } // 1A, 1B, 2A, 2B, 3A, 3B

flatMapLatest
Возвращает поток, который переключается на новый поток, создаваемый функцией преобразования, каждый раз, когда исходный поток выдает значение. Когда исходный поток выдает новое значение, предыдущий поток, созданный блоком преобразования, отменяется

flow {
    emit("A")
    delay(100)
    emit("B")
}.flatMapLatest { value ->
    flow {
        emit(value)
        delay(200)
        emit(value + "_last")
    }
} // A, B, B_last

buffer
Не меняет поток элементов, а помогает с производительностью. Если мы добавим буфер между операторами onEach и collect, он создаст отдельную сопрограмму для параллельного сбора элементов из оператора onEach в соответствии с предоставленной емкостью буфера. Это позволит нам собрать все элементы за 300 мс, так как все операторские операции не должны выполняться последовательно

flowOf("A", "B", "C")
    .onEach  { delay(300) } // Output takes 900ms as all items are processed sequentially

flowOf("A", "B", "C")
    .onEach  { delay(300) }
    .buffer(3) // Output takes 300ms as all items are processed in parallel at buffer operator 
               // and delivered to collect immediately.

onEmpty
Вызывает действие, если поток завершается без испускания элементов

val nameFlow: Flow<String> = flowOf()
nameFlow.onEmpty {
    emit("Bob")
    emit("Alice")
} // Bob, Alice

conflate()
В случае нескольких вызовов emit равно обработает все, но соберет только последний вызов

Clone this wiki locally