본문 바로가기
Programando/Android

[Android/Kotlin] RxJava와 RxJava를 이용한 EventBus 구현

RxJava란 Reactive X(Reactive Extensions)를 Java로 구현한 라이브러리이며, Reactive X는 관찰 가능한(Observable) 스트림을 사용하는 비동기 프로그래밍을 위한 API입니다.

그리고 RxJava에 안드로이드용 스케쥴러 등 몇 가지의 클래스를 추가해 안드로이드 개발을 쉽게 할 수 있도록 하는 RxAndroid 라이브러리도 존재합니다.

 

종속성 추가

RxJava3를 사용하기 위해서는 먼저 app 모듈의 build.gradle에 종속성을 추가해야 합니다.

dependencies {
    implementation 'io.reactivex.rxjava3:rxjava:3.1.4'
}

 

Observable

RxJava에서는 Observable을 구독하는 Observer가 존재하고, 이 ObserverObservable이 순차적으로 발행하는 데이터에 대해서 반응합니다. Observable은 다음의 3가지 이벤트를 사용하여 동작합니다.

  • onNext() : 하나의 소스 Observable에서 Observer까지 한 번에 하나씩 순차적으로 데이터를 발행합니다. 데이터를 발행할 때 null은 발행할 수 없습니다.
  • onComplete() : 오류가 발생하지 않았다면 Observable은 데이터의 발행이 끝났음을 알리는 완료 이벤트를 Observer에 전달하여 onNext()를 더 이상 호출하지 않음을 나타냅니다.
  • onError() : 오류가 발생했음을 Observer에 전달합니다. 해당 메서드가 호출되면 onNext()onComplete()는 더 이상 호출되지 않습니다. 오류를 발행할 때 null은 발행할 수 없습니다.

 

Observable 생성

- create()

Observable.create()를 사용하면 Emitter를 이용하여 직접 아이템을 발행하고, 아이템의 발행 완료나 오류(Complete / Error)의 알림을 직접 설정할 수 있습니다.

Emitter를 통해 아이템을 발행한 후에는 해당 Observable을 구독하기 위해 subscribe()를 호출해서 Observer를 연결해야 합니다.

val source = Observable.create<String> { emitter ->
    emitter.onNext("Hello")
    emitter.onNext("World!")
    emitter.onComplete()
}
source.subscribe { 
    println(it)
}

create()를 통해 Observable 객체를 생성했다면 Observable을 더 이상 사용하지 않을 때에는 등록된 Callback을 모두 해제해주어야만 Memory Leak이 발생하지 않고, BackPressure를 직접 처리하지 않아도 됩니다.

 

Disposable

Observable 객체에서 발행할 아이템을 정의한 후, subscribe()를 통해 스트림을 생성하고 아이템을 발행하면 Disposable 객체가 반환됩니다.

val source = Observable.just("Hello", "RxJava", "World!")
val disposable: Disposable = source.subscribe{println(it)}

만약 Observable이 발행하는 아이템의 개수가 정해져 있다면 모두 발행된 이후에 onComplete()가 호출되고, 안전하게 종료될 것입니다. 하지만 아이템을 무한히 발행하거나 오랫동안 실행되는 Observable의 경우에는 제대로 종료해주지 않는다면 Memory Leak이 발생할 수 있습니다. 더 이상 Observable의 구독이 필요하지 않을 때에는 이를 dispose하는 것이 효율적이며, Disposable.dispose()를 호출해서 언제든지 아이템의 발행을 중단할 수 있습니다.

val source = Observable.just("Hello", "RxJava", "World!")
val disposable: Disposable = source.subscribe{println(it)}

Thread{
    try {
        Thread.sleep(3500)
    } catch (e: Exception) {
        e.printStackTrace()
    }
    disposable.dispose()
}.start()

 

반응형

 

CompositeDisposable

CompositeDisposable는 여러 Disposable을 추가해두고, 한 번에 dispose할 수 있도록 해줍니다.

val source = Observable.interval(1, TimeUnit.SECONDS)
val disposable1: Disposable = source.subscribe{println(it)}
val disposable2: Disposable = source.subscribe{println(it)}
val disposable3: Disposable = source.subscribe{println(it)}
val disposable4: Disposable = source.subscribe{println(it)}

val compositeDisposable = CompositeDisposable()
compositeDisposable.add(disposable1)
compositeDisposable.addAll(disposable2, disposable3, disposable4)

...

compositeDisposable.dispose()

 

EventBus의 구현

events는 PublishSubject 타입으로, 구독된 이후에 갱신된 값들만 받을 수 있도록 합니다. 만약 구독하기 전, 가장 최근의 값을 받고 싶다면 타입을 BehaviorSubject로 지정하면 됩니다.

object EventBus {
    private val events: PublishSubject<Any> = PublishSubject.create()

    fun post(event: Any) {
        events.onNext(event)
    }

    fun <T> listen(eventType: Class<T>): Observable<T> {
        return events.ofType(eventType)
    }
}

 

EventBus의 사용

Event 전송

EventBus.post("이벤트 발생!")

 

Event 수신

val textView = ObservableField("")

compositeDisposable = CompositeDisposable()
compositeDisposable.add(
    EventBus.listen(String::class.java).subscribe {
        // DataBinding
        textView.set(it)
    }
)

 

그 외

자료 참조

https://blog.yena.io/studynote/2020/10/11/Android-RxJava(1).html

반응형