woshidan's blog

あいとゆうきとITと、とっておきの話。

RxJavaのonSubscribeメソッドで指定したスレッドはどこで生成されて起動されているのか

この記事は、RxJava Advent Calendar 2015 の12月15日分の記事です。

非同期処理についてよくわからなかったため、下記の本を読んでみたところ、スレッドで処理が行われるには、

が必要だと分かりました。

Android開発を行っている際、RxJavaを利用すると、HTTP通信等の非同期処理を非常に簡単に記述することが出来ます。

私個人の話となりますが、私自身はその場の勢いでいきなりRxJavaやその他もろもろ流行りなあれこれが残されたAndroidのプロジェクトに飛び込んでいる状況のため、非同期処理をそれと認識して書く最初のお仕事がRxJavaを使ったものとなりまして。

本当は基本であるはずのスレッドのインスタンスであるとか、複数のスレッドから利用されるインスタンスでロックを取って排他制御を行ったり、データの受け渡し役のクラスがいたり、といった話がとても面白かったのでした。

その流れで、RxJavaのコードでもスレッドのインスタンスの生成とスレッドの起動をしている箇所があるはずなのだから、試しに少し追って見ようか、とRxJavasubscribeOnスレッドを作成して起動させている部分を眺めてみました。

本当は他の部分も読んでいたのですが、うまくまとめきれず subscribeOnの部分だけで大変な分量になってしまったため、本記事ではsubscribeOnまわりだけで勘弁させていただきます。

また、書いている人は、処理が行われるスレッドが生成、起動(、停止)されるタイミングを大まかに知りたかっただけなので、あらかじめご了承ください。

詳しい人は、教えてください...。

まとめ

  • Observable.subscribeOnメソッド
    • liftメソッドOperatorSubscribeOnオペレータで元のObservableに何かするみたいです
  • OperatorSubscribeOnクラス
  • worker.schedule()でよびだされるAction0#callメソッド内で、OperatorSubscribeOn#call()メソッドで生成しているsubscriberが受け取っているObservableunsafeSubscribeが実行されていますよ
    • unsafeSubscribeの中でOperatorSubscribeOnのフィールドのsubscriber(=元を辿れば.subscribeOnを呼び出したObservableの)のonNext()が行われてますよ
    • unsafeSubscribeの中に、OperatorSubscribeOn#call()メソッドの引数として与えられているsubscriberが持ち込まれているっぽいですよ

では、以下に長々とコードを眺めていきます。

Observable.subscribeOnメソッド

まず、Observable.subscribeOnのコードを探したらこちらでした。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

今回追いたい*1のは非同期処理の場合なので、おそらく

return nest().lift(new OperatorSubscribeOn<T>(scheduler));

こちらだと思います*2

lift()ではまず、与えたOperatorを使ってsubscriberを作成します。

そして、そのsubscriberを引数に、元のObservableOnSubscribe#callメソッドを呼び出すという感じの中身のObservableを生成して返す、 つまり引数に与えられたOperatorを使ってちょいと衣を足した新しいObservableを返す、といったことをしています*3

OperatorSubscribeOnクラス

それをふまえて、OperatorSubscribeOnを覗いてみます。まず、コンストラクタから。

public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

    private final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

schedulerprivate変数に代入しているみたいですね。

では次に、subscriberを作成しているcall()メソッドの方を見てみます。

    @Override
    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

            @Override
            public void onCompleted() {
                // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

                            @Override
                            public void onCompleted() {
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                subscriber.onError(e);
                            }

                            @Override
                            public void onNext(T t) {
                                subscriber.onNext(t);
                            }

                            @Override
                            public void setProducer(final Producer producer) {
                                subscriber.setProducer(new Producer() {

                                    @Override
                                    public void request(final long n) {
                                        if (Thread.currentThread() == t) {
                                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                                            producer.request(n);
                                        } else {
                                            inner.schedule(new Action0() {

                                                @Override
                                                public void call() {
                                                    producer.request(n);
                                                }
                                            });
                                        }
                                    }

                                });
                            }

                        });
                    }
                });
            }

        };
    }
}

.........。

長いですね。

気になるところがたくさんありますが、事前に少し読んだところ、まさにここでスレッド立ててますよ、という意味ではっきり関係あると理解できたのは、 下記の2カ所です。

final Worker inner = scheduler.createWorker();
@Override
public void onNext(final Observable<T> o) {
    inner.schedule(new Action0() {

        @Override
        public void call() {
            final Thread t = Thread.currentThread();
            o.unsafeSubscribe(new Subscriber<T>(subscriber) {

少ないですね*4。スレッドの生成と起動のタイミングを大雑把に把握さえできれば今回の目的はそれで良いのです。

scheduler.createWorker()によるワーカーの作成

final Worker inner = scheduler.createWorker();

では与えられたschedulerに対応しているWorkerを作っていて、subscribeOnに関して、Threadの生成に近いことが行われている、と思っているのがここです。

大体Schedulers.newThread()を指定している場合は、下記のクラスのインスタンスが生成されます。

public final class NewThreadScheduler extends Scheduler {
  // ...
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(THREAD_FACTORY);
    }

このNewThreadWorkerコンストラクタ内でScheduledExecutorServiceクラス(スレッド(Runnable)の処理を実行する働きのクラス)のインスタンスが作成されます。

public class NewThreadWorker extends Scheduler.Worker implements Subscription {
  // ...
    public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // スレッドを起動させるためのインスタンス GETだぜ!
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
        executor = exec;
    }

worker.schedule()による別スレッドでの実行命令

次に、inner.schedule(Action0 ...の方を見てみます。

そういえば深くは追っていませんが、Action..というのはsubscribeメソッドに渡される処理内容を表すクラスみたいです。

さて、このinner(NewThreadWorker).schedule()メソッドではThreadの起動を行っています。

NewThreadWorkerクラスに戻りましょう。

// NewThreadWorker
    @Override
    public Subscription schedule(final Action0 action) {
        return schedule(action, 0, null);
    }

    // もう1個オーバロードしたメソッドをはさみつつ
    @Override
    public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
        if (isUnsubscribed) {
            return Subscriptions.unsubscribed();
        }
        return scheduleActual(action, delayTime, unit);
    }

    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

scheduleActualメソッドの中で、executor(スレッドを管理する働きのクラス)submit()メソッドschedule()メソッドが呼ばれています*5

schdule()メソッドは指定された時間後に、submit()は遅延時間0でタスク(run = いくつか設定等が追加されたactionの内容)を実行するといった感じの関数です*6

なので、ここで別のスレッドで処理をおっぱじめるぞ、と書いているみたいですね。

worker.schedule()でよびだされる Action0 callメソッド内でObservableのunsafeSubscribe実行

さて、inner.schedule(Action0 ...の行で別のスレッドに処理を投げて実行させる命令が走っていることがわかったので、今度はその内容として与えられている、Action0の方を見てみます。

inner.schedule(new Action0() {

    @Override
    public void call() {
        final Thread t = Thread.currentThread();
        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }

            @Override
            public void onNext(T t) {
                subscriber.onNext(t);
            }

            @Override
            public void setProducer(final Producer producer) {
                subscriber.setProducer(new Producer() {

                    @Override
                    public void request(final long n) {
                        if (Thread.currentThread() == t) {
                            // don't schedule if we're already on the thread (primarily for first setProducer call)
                            // see unit test 'testSetProducerSynchronousRequest' for more context on this
                            producer.request(n);
                        } else {
                            inner.schedule(new Action0() {

                                @Override
                                public void call() {
                                    producer.request(n);
                                }
                            });
                        }
                    }

                });
            }

        });
    }
});

たぶん、scheduleメソッドでは、Action0callメソッドが実行されるのでしょう。

Action0callメソッドの中で注目したいのが、まずここです。

            public void onNext(final Observable<T> o) {
                inner.schedule(new Action0() {

                    @Override
                    public void call() {
                        final Thread t = Thread.currentThread();
                        o.unsafeSubscribe(new Subscriber<T>(subscriber) {

ObservableunsafeSubscribeメソッドscheduleメソッドの引数のAction0()call()メソッドの中で呼びだしています。

ということは、これはshcduleメソッドを呼び出しているインスタンスのスレッドで、このObservableの処理が開始されるのでしょうか。

そんな感じで別スレッドで行われていそうな、unsafeSubscribeメソッドの中で生成されているsubscriberonNextメソッドの中を見てみます。

subscriber.onNext を呼び出しています。

@Override
public void onNext(T t) {
    subscriber.onNext(t);
}

ここの、subscriberOperatorSubscribeOn#call()で作成しているSubscriberのフィールドのsubscriberであり、ということはsubscribeOnメソッドの呼び出し元で作っているOnSubscribeからcall()メソッドの引数として与えられたsubscriberみたいです。

    public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        return new Subscriber<Observable<T>>(subscriber) {

// Observable#liftメソッドより。上のsubscriber = 下記のo.
Subscriber<? super T> st = hook.onLift(operator).call(o); // hook.onLift(operator) = operator. hookはデフォルトだと何もしない

ということは、このsubscriberは呼び出し元のsubscriberの値を受け取って動くOnSubscribecall()の引数です。

変換したり、関数の引数の形で色々して別スレッドで行う関数の中までsubscribersubscriberが持っているObservableが放出された値を持ってきているっぽいですね。

最後にObservable.liftメソッドを見て確認しておきます。

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o); // hook.onLift(operator) = operator. hookはデフォルトだと何もしない
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st); // Operatorで新しく作ったsubscriberがcallメソッドに渡される
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

長くなりましたね...。もう一度になりますが、こちらにもまとめを載せさせてください。

まとめ

  • Observable.subscribeOnメソッド
    • liftメソッドOperatorSubscribeOnオペレータで元のObservableに何かするみたいです
  • OperatorSubscribeOnクラス
  • worker.schedule()でよびだされるAction0#callメソッド内で、OperatorSubscribeOn#call()メソッドで生成しているsubscriberが受け取っているObservableunsafeSubscribeが実行されていますよ
    • unsafeSubscribeの中でOperatorSubscribeOnのフィールドのsubscriber(=元を辿れば.subscribeOnを呼び出したObservableの)のonNext()が行われてますよ
    • unsafeSubscribeの中に、OperatorSubscribeOn#call()メソッドの引数として与えられているsubscriberが持ち込まれているっぽいですよ

ところで、上記をふまえると、

  • OnSubscribeに渡されるsubscriberonNext()メソッドsubscribe()メソッドを呼び出した時点から実行され始めるので、onSubscribe()メソッドで指定されたスレッドは、subscribe()メソッドが呼ばれた時点で生成されていそう。
  • また、起動もsubscribe()メソッドObservableが値を放出し始めたタイミングでschedule()メソッドが呼ばれて実行されていそう

という感じがしました。

実は、自分自身はPresenterObservableの処理を書いて、ActivityなどからPresenterメソッドで作成したObservableを呼び出してsubscribeする書き方が癖なのですが、そうすると画面が表示されているタイミングと処理を行うスレッドが生成、(unsubscribe()メソッド等で)停止されるタイミングは画面と密に関わってますね。

画面とは関係無しに1つのスレッドに処理を投げて、その特定のスレッドから結果や進捗を受け取りたい、ということがしたい場合にはIntentServiceを使った方がいいのかな...勉強だー。

*1:追いきれるとは限らない

*2:もう少しきちんと確かめられれば良いのですが...

*3: もう少し綺麗な言い方だとストリームを変換していると言うのだと思います。 OperatorMap(mapメソッドで使われる)の場合について http://woshidan.hatenablog.com/entry/2015/12/13/205131

*4:...うっ...

*5:返り値のFuture...と出ているのは、まだ処理が完了していない場合に仮の値を、終わった場合に仮の値と同じように扱える本物の値を返しているというくらいの感じだと思います。非同期処理を引き受けましたよ、いつぐらいに見にきてね的な値を先もって渡すような処理の書き方があり、それをFutureパターンとか、いつぐらいに見にきてねと渡される値を表すクラスをFuture役といったりするそうです

*6:ScheduledActionを見ていると runは、キャンセルされたときにどうするか、といった部分でいくつか設定した処理を持っておく、といった感じに見えますが詳しくは分からないです...