RxJavaのmapのコードを眺めてみた
3行まとめ
- mapメソッドを呼び出すと新しいObservableが生成されて返ってくる*1
- 新しいObservableのOnSubscribe#callメソッドは、元のObservableのOnSubscribe#callメソッドに対して、新しく作ったsubscriberを与えて呼び出す、というもの
詳細
まず、Observable.map(Func1<? super T, ? extends R> func)
メソッドはこちら。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return lift(new OperatorMap<T, R>(func)); }
じゃあ、lift
メソッドはどうなっているかというと、
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }
おう。
Observableの型がOperatorを受け取って、Operatorの返す型を(subscriberが)受け取るObservableに作り直している感じです。
そのとき、新しく作られるObservableのOnSubscribeのcall
メソッドを眺めてみます。
Subscriber<? super T> st = hook.onLift(operator).call(o);
について。
hook.onLift(operator)
は*2、
operator(= new OperatorMap<T, R>(func) のインスタンス)
を返すので、この行は、OperatorMap#map(Subscriber o)
となります。
では、OperatorMap
を見てみます。
OperatorMap
はmap
メソッドの引数に与えたFunc1
をtransformer
というプロパティで持っている。
public final class OperatorMap<T, R> implements Operator<R, T> { private final Func1<? super T, ? extends R> transformer; public OperatorMap(Func1<? super T, ? extends R> transformer) { this.transformer = transformer; }
OperatorMap#call(final Subscriber<? super R> o)
は下記。
@Override public Subscriber<? super T> call(final Subscriber<? super R> o) { return new Subscriber<T>(o) { @Override public void onCompleted() { o.onCompleted(); } @Override public void onError(Throwable e) { o.onError(e); } @Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwOrReport(e, this, t); } } }; }
おそらく前のObservableで値を発していたsubscriberが受け継がれる形かなにかでo
として与えられて、
それを元に新しい Subscriber
を作って返している。
そのsubscriberのonNext()
メソッドを見てみると
@Override public void onNext(T t) { try { o.onNext(transformer.call(t)); } catch (Throwable e) { Exceptions.throwOrReport(e, this, t); } }
となっていて、元から受け取った値をObserverに渡す前にtransformerのcallメソッドをかまして変換するもの、となっている。
それでは、このsubscriber
を受け取っているObservable.lift
メソッドに戻る。
onSubscribe.call(st);
の行に戻る。onSubscribe
は元のObservableのフィールドが持つOnSubscribe
のインスタンス。
なので、新しい返り値として作ったObservable
のcall
では、先ほど見た変換用の関数をかませるsubscriberを与えて元のObservable
のOnSubscribe
のcall
を呼び出す、
みたいなことになっているのではないかな。