-
-
Notifications
You must be signed in to change notification settings - Fork 5
RxJava
Important
ВНИМАНИЕ!
ЭТОТ РАЗДЕЛ БОЛЬШЕ НЕ ПОДДЕРЖИВАЕТСЯ!
РОАДМАП ПЕРЕЕХАЛ В NOTION
RxJava: Reactive Extensions for the JVM
RxJava - базука с большой огневой мощью. Обычно не используется и половины ее возможностей. RxJava - это реактивное программирование и backpressure
Streams
Observable, Single, Completable, Maybe, Flowable
Subjects
Subject – это абстрактный класс в RxJava, одновременно расширяющий класс Observable и реализующий интерфейс Observer. Subject – это hot observable. Виды: Publish, Replay, Behavior, Async, Unicast
subscribeOn
Влияет на операторы выше. Если указан только subscribeOn - все операторы будут выполняться в указанном потоке
observeOn
Влияет на операторы ниже. Если указан только observeOn - все операторы будут выполняться в текущем потоке, а операторы, расположенные ниже observeOn будут переключаться на поток, указанный в observeOn
map()
Обрабатывает отправленный объект и преобразует его в другой объект
Observable.just(1, 2, 3)
.map { number: Int -> number * number }
.subscribe { number: Int -> println(number) } // output: 1, 4, 9
flatMap()
Обрабатывает отправленный объект и преобразует его в другой объект типа Observable. Может чередовать элементы при генерировании, т.е. порядок генерируемых элементов не сохраняется
Observable.just(1, 2, 3)
.flatMap { number: Int -> Observable.just(number * number) }
.subscribe { number: Int -> println(number) } // output: 1, 4, 9
concatMap()
Похож на flatMap, но сохраняет порядок элементов
Observable.just(1, 2, 3)
.concatMap { number: Int -> Observable.just(number * number) }
.subscribe { number: Int -> println(number) } // output: 1, 4, 9
switchMap()
Отменяет подписку на предыдущий исходный Observable всякий раз, когда начинает испускаться новый элемент, таким образом всегда испуская элементы из текущего Observable
Observable.just(1, 2, 3)
.switchMap { number: Int -> Observable.just(number * number) }
.subscribe { number: Int -> println(number) } // output: 1, 2, 3
distinct()
Отфильтровать повторяющиеся события
Observable.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe { number: Int -> println(number) } // output: 1, 2, 3
merge
Объединяет несколько obserbavles в один
Observable.just(1, 2, 3)
.mergeWith(Observable.just(4, 5, 6))
.subscribe { item -> println(item) } // output: 1, 2, 3, 4, 5, 6
zip
Объединяет наборы элементов, испускаемых двумя или более observable, вместе с помощью указанной функции и испускает элементы на основе результатов этой функции
Observable.just(2, 4, 8)
.zipWith(Observable.just(3, 6, 9)) { first, last -> first + last }
.subscribe { item -> println(item) } // output: 5, 10, 17
Потоки в RxJava выполняются с помощью планировщиков. Scheduler управляет одним или несколькими потоками. Когда планировщику необходимо выполнить задачу он берет поток из своего пула и запускает в нем задачу
Schedulers.io()
Поддерживается неограниченным пулом потоков. Используется для операций ввода-вывода, не требующих интенсивного использования ЦП (взаимодействие с файловой системой, сетевые вызовы, базы данных)
Schedulers.computation()
Поддерживается ограниченным пулом потоков, размер которого не больше количества доступных процессоров (Runtime.availableProcessors). Использовать для вычислительных и ресурсоемких работ (изменение размера изображений, обработка больших наборов данных). Не делать таких потоков больше, чем доступно ядер, иначе производительность будет снижаться из-за переключения контекста и накладных расходов на создание потоков
Schedulers.newThread()
Создает новый поток для каждой запланированной задачи. Это планировщик стоит дорого, так как не происходит повторного переиспользования потока
Schedulers.from(Executor executor)
Создает настраиваемый планировщик, поддерживаемый указанным executor. Если задача запланирована, когда все потоки заняты - будет поставлена в очередь
Scheduler.from(Executors.newFixedThreadPool(n)) // ограничить количество одновременных потоков в пуле
Schedulers.single()
Поддерживается одним потоком, выполняет задачи последовательно в указанном порядке
Schedulers.trampoline()
Выполняет задачи в порядке FIFO (first in first out) одним из рабочих потоков. Используется при реализации рекурсии, чтобы не увеличивался стек вызовов
AndroidSchedulers.mainThread()
Основной поток - место, где происходит взаимодействие с пользователем
Disposable
Интерфейс с методами dispose() и isDisposed(). Работае как оболочка над другими операторами
CompositeDisposable
Объединяет несколько disposable, которые могут быть удалены одновременно
Представляет стрим объектов. Подписчики на Observable имеют коллбэки onNext(value), onComplete(), onError(throwable). onNext() может не вызываться, или вызываться произвольное количество раз. При завершении стрима вызывается onComplete() или onError()
Отправляет объект, который принимается в коллбэке onSuccess(value), или бросает исключение в коллбэк onError(throwable) в случае ошибки
Не возвращает никакого значения. На подписчиках вызывается onComplete() при удачном завершении или onError(throwable) в случае ошибки
Может отработать как Single или как Completable. На подписчиках вызывается один из трех коллбэков: onSuccess(value), onComplete() без какого-либо значения, или onError(throwable). Каждый из коллбэков может быть вызван один раз или не вызван вообще
Работает как Observable, но поддерживает backpressure по умолчанию
Backpressure
Ситуация, когда observable выдает элементы быстрее, чем оператор или подписчик может использовать. Observable имеет бесконечный буфер, в который будут добавляться элементы до тех пор, пока не случится OutOfMemoryError. BackpressureStrategy – это enum, который задает стратегию обработки backpressure
• BackpressureStrategy.MISSING
- стратегия не установлена
• BackpressureStrategy.ERROR
- в случае backpressure бросает исключение MissingBackpressureException
• BackpressureStrategy.BUFFER
- буфер будет расширяться каждый раз, когда ему потребуется больше памяти. Его первоначальный размер составляет 128 элементов
• BackpressureStrategy.DROP
- если потребитель занят обработкой данных, эта стратегия буквально отбрасывает все значения, которые не могут быть обработаны
• BackpressureStrategy.LATEST
- также отбрасывает неиспользуемые значения, но кэширует последнее. Его можно сравнить с БУФЕРОМ с постоянным размером в 1 элемент. Благодаря этому, когда потребитель запрашивает новое значение, он всегда получает самое последнее
Flowable.create<Int>({ emitter: FlowableEmitter<Int> ->
repeat(1_000) { count -> emitter.onNext(count) }
}, BackpressureStrategy.BUFFER)
.subscribe { println(it) }
Подписчики получают новые данные с момента подписки. PublishSubject не кэширует и не рассылает прошлые элементы (студент слушает лекцию с того момента, как вошел в аудиторию)
Подписчики получают все данные, отправленные до подписки и все новые данные с момента подписки (студент опоздал на лекцию, но получает ее содержимое с самого начала). Если ReplaySubject создается фабричным методом createWithSize(size: Int), то подписчики будут получать только заданное количество элементов, отправленных в прошлом
Подписчики получат последний элемент данных, разосланный до подписки и все новые данные с момента подписки
Подписчики получат только самый последний элемент данных, который был отправлен перед вызовом onComplete() (студент пришел на пару и получил только домашку)
Работает также как ReplaySubject, но может иметь только одного подписчика. Все последующие подписчики получают onError() с IllegalStateException
Home • Interviews • Android Architecture • Android Jetpack • Android Jetpack Compose • Android Releases • Android SDK • Android Views • Basic • Design • Git • GitHub • Gradle • Java • Kotlin • Kotlin Coroutines • RxJava