RxJava
란 Reactive X(Reactive Extensions)를 Java로 구현한 라이브러리이며, Reactive X는 관찰 가능한(Observable) 스트림을 사용하는 비동기 프로그래밍을 위한 API입니다.
그리고 RxJava
에 안드로이드용 스케쥴러 등 몇 가지의 클래스를 추가해 안드로이드 개발을 쉽게 할 수 있도록 하는 RxAndroid
라이브러리도 존재합니다.
종속성 추가
RxJava3
를 사용하기 위해서는 먼저 app 모듈의 build.gradle에 종속성을 추가해야 합니다.
dependencies {
implementation 'io.reactivex.rxjava3:rxjava:3.1.4'
}
Observable
RxJava
에서는 Observable
을 구독하는 Observer
가 존재하고, 이 Observer
는 Observable
이 순차적으로 발행하는 데이터에 대해서 반응합니다. Observable
은 다음의 3가지 이벤트를 사용하여 동작합니다.
onNext()
: 하나의 소스Observable
에서Observer
까지 한 번에 하나씩 순차적으로 데이터를 발행합니다. 데이터를 발행할 때 null은 발행할 수 없습니다.onComplete()
: 오류가 발생하지 않았다면Observable
은 데이터의 발행이 끝났음을 알리는 완료 이벤트를Observer
에 전달하여onNext()
를 더 이상 호출하지 않음을 나타냅니다.onError()
: 오류가 발생했음을Observer
에 전달합니다. 해당 메서드가 호출되면onNext()
나onComplete()
는 더 이상 호출되지 않습니다. 오류를 발행할 때 null은 발행할 수 없습니다.
Observable 생성
- create()
Observable.create()
를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템의 발행 완료나 오류(Complete / Error)의 알림을 직접 설정할 수 있습니다.
Emitter를 통해 아이템을 발행한 후에는 해당 Observable
을 구독하기 위해 subscribe()
를 호출해서 Observer
를 연결해야 합니다.
val source = Observable.create<String> { emitter ->
emitter.onNext("Hello")
emitter.onNext("World!")
emitter.onComplete()
}
source.subscribe {
println(it)
}
create()
를 통해 Observable
객체를 생성했다면 Observable을 더 이상 사용하지 않을 때에는 등록된 Callback을 모두 해제해주어야만 Memory Leak이 발생하지 않고, BackPressure를 직접 처리하지 않아도 됩니다.
Disposable
Observable
객체에서 발행할 아이템을 정의한 후, subscribe()
를 통해 스트림을 생성하고 아이템을 발행하면 Disposable
객체가 반환됩니다.
val source = Observable.just("Hello", "RxJava", "World!")
val disposable: Disposable = source.subscribe{println(it)}
만약 Observable
이 발행하는 아이템의 개수가 정해져 있다면 모두 발행된 이후에 onComplete()
가 호출되고, 안전하게 종료될 것입니다. 하지만 아이템을 무한히 발행하거나 오랫동안 실행되는 Observable
의 경우에는 제대로 종료해주지 않는다면 Memory Leak이 발생할 수 있습니다. 더 이상 Observable
의 구독이 필요하지 않을 때에는 이를 dispose하는 것이 효율적이며, Disposable.dispose()
를 호출해서 언제든지 아이템의 발행을 중단할 수 있습니다.
val source = Observable.just("Hello", "RxJava", "World!")
val disposable: Disposable = source.subscribe{println(it)}
Thread{
try {
Thread.sleep(3500)
} catch (e: Exception) {
e.printStackTrace()
}
disposable.dispose()
}.start()
CompositeDisposable
CompositeDisposable
는 여러 Disposable
을 추가해두고, 한 번에 dispose할 수 있도록 해줍니다.
val source = Observable.interval(1, TimeUnit.SECONDS)
val disposable1: Disposable = source.subscribe{println(it)}
val disposable2: Disposable = source.subscribe{println(it)}
val disposable3: Disposable = source.subscribe{println(it)}
val disposable4: Disposable = source.subscribe{println(it)}
val compositeDisposable = CompositeDisposable()
compositeDisposable.add(disposable1)
compositeDisposable.addAll(disposable2, disposable3, disposable4)
...
compositeDisposable.dispose()
EventBus의 구현
events는 PublishSubject
타입으로, 구독된 이후에 갱신된 값들만 받을 수 있도록 합니다. 만약 구독하기 전, 가장 최근의 값을 받고 싶다면 타입을 BehaviorSubject
로 지정하면 됩니다.
object EventBus {
private val events: PublishSubject<Any> = PublishSubject.create()
fun post(event: Any) {
events.onNext(event)
}
fun <T> listen(eventType: Class<T>): Observable<T> {
return events.ofType(eventType)
}
}
EventBus의 사용
Event 전송
EventBus.post("이벤트 발생!")
Event 수신
val textView = ObservableField("")
compositeDisposable = CompositeDisposable()
compositeDisposable.add(
EventBus.listen(String::class.java).subscribe {
// DataBinding
textView.set(it)
}
)
그 외
자료 참조
https://blog.yena.io/studynote/2020/10/11/Android-RxJava(1).html
'Programando > Android' 카테고리의 다른 글
[Android/Kotlin] 안드로이드 12 SplashScreen 대응하기 (0) | 2022.06.29 |
---|---|
[Android/Kotlin] Hilt (0) | 2022.05.22 |
[Android/Kotlin] 양방향 바인딩(Two-way DataBinding) (0) | 2022.05.02 |
[Android/Kotlin] SharedFlow를 이용한 EventBus 구현 (0) | 2022.04.17 |
[Android/Java] Retrofit2를 통해 RestAPI와 통신하기 (0) | 2022.03.17 |