woshidan's blog

あいとゆうきとITと、とっておきの話。

RxJavaのmapのコードを眺めてみた

3行まとめ

  • mapメソッドを呼び出すと新しいObservableが生成されて返ってくる*1
  • 新しいObservableのOnSubscribe#callメソッドは、元のObservableのOnSubscribe#callメソッドに対して、新しく作ったsubscriberを与えて呼び出す、というもの
    • 新しいsubscriberの中身は、mapメソッドに与えたFunc1()の中身をかませるといった処理がonNextメソッドに入っている

詳細

まず、Observable.map(Func1<? super T, ? extends R> func)メソッドはこちら。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

じゃあ、liftメソッドはどうなっているかというと、

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                try {
                    // new Subscriber created and being subscribed with so 'onStart' it
                    st.onStart();
                    onSubscribe.call(st);
                } catch (Throwable e) {
                    // localized capture of errors rather than it skipping all operators 
                    // and ending up in the try/catch of the subscribe method which then
                    // prevents onErrorResumeNext and other similar approaches to error handling
                    Exceptions.throwIfFatal(e);
                    st.onError(e);
                }
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // if the lift function failed all we can do is pass the error to the final Subscriber
                // as we don't have the operator available to us
                o.onError(e);
            }
        }
    });
}

おう。

Observableの型がOperatorを受け取って、Operatorの返す型を(subscriberが)受け取るObservableに作り直している感じです。 そのとき、新しく作られるObservableのOnSubscribeのcallメソッドを眺めてみます。

Subscriber<? super T> st = hook.onLift(operator).call(o);

について。

hook.onLift(operator)*2operator(= new OperatorMap<T, R>(func) のインスタンス)を返すので、この行は、OperatorMap#map(Subscriber o)となります。

では、OperatorMapを見てみます。

OperatorMapmapメソッドの引数に与えたFunc1transformerというプロパティで持っている。

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

OperatorMap#call(final Subscriber<? super R> o) は下記。

@Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

おそらく前のObservableで値を発していたsubscriberが受け継がれる形かなにかでoとして与えられて、 それを元に新しい Subscriber を作って返している。

そのsubscriberのonNext()メソッドを見てみると

@Override
public void onNext(T t) {
    try {
        o.onNext(transformer.call(t));
    } catch (Throwable e) {
        Exceptions.throwOrReport(e, this, t);
    }
}

となっていて、元から受け取った値をObserverに渡す前にtransformerのcallメソッドをかまして変換するもの、となっている。

それでは、このsubscriberを受け取っているObservable.liftメソッドに戻る。

onSubscribe.call(st);

の行に戻る。onSubscribeは元のObservableのフィールドが持つOnSubscribeインスタンス

なので、新しい返り値として作ったObservablecallでは、先ほど見た変換用の関数をかませるsubscriberを与えて元のObservableOnSubscribecallを呼び出す、 みたいなことになっているのではないかな。

*1:Decoratorパターンっぽい

*2:というより、hook.onXXXメソッドは基本的にログ等のためにカスタマイズしたい人用のクラスであるらしく、基本的にはメソッドの引数に渡したインスタンスがそのまま返ってくるようです https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java#L26-L27