Celeste Engineer

Androidとか自転車とか

RxJava 1.x -> 2.x: Observable.OnSubscribe<T> から ObservableOnSubscribe<T> へ移行する

RxJava 1.x には Observable の static なインナークラスとして Observable.OnSubscribe がいました。Javadoc にあるとおり、Observable を subscribe したときに呼ばれるメソッドを定義するためのフックになるクラスです。

RxJava 2.x ではインナークラスではなく ObservableOnSubscribe というクラスになっています。また、オーバライドすべきメソッドはcall(Subscriber<? super T>)ではなくsubscribe(ObservableEmitter<T>)になっています。

Observable.OnSubscribe<T>#call(Subscriber<? super T>)のなかで、unsubscribe したときにリソースの開放やリスナーの解除などをするフックを作っていた場合、RxAndroid のMainThreadSubscriptionでフックすることになります。そしてそのインスタンスSubscriber<? super T>#add(Subscription)に渡します。 これを RxJava 2.x に移行するとき、MainThreadSubscriptionMainThreadDisposableに、Subscriber<? super T>#add(Subscription)ObservableEmitter<T>#setDisposable(Disposable)に書き換えます。