RxJavaのonSubscribeメソッドで指定したスレッドはどこで生成されて起動されているのか
この記事は、RxJava Advent Calendar 2015 の12月15日分の記事です。
非同期処理についてよくわからなかったため、下記の本を読んでみたところ、スレッドで処理が行われるには、
が必要だと分かりました。
増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編
- 作者: 結城浩
- 出版社/メーカー: ソフトバンククリエイティブ
- 発売日: 2006/03/21
- メディア: 大型本
- 購入: 15人 クリック: 287回
- この商品を含むブログ (204件) を見る
Android開発を行っている際、RxJava
を利用すると、HTTP通信等の非同期処理を非常に簡単に記述することが出来ます。
私個人の話となりますが、私自身はその場の勢いでいきなりRxJava
やその他もろもろ流行りなあれこれが残されたAndroid
のプロジェクトに飛び込んでいる状況のため、非同期処理をそれと認識して書く最初のお仕事がRxJava
を使ったものとなりまして。
本当は基本であるはずのスレッドのインスタンスであるとか、複数のスレッドから利用されるインスタンスでロックを取って排他制御を行ったり、データの受け渡し役のクラスがいたり、といった話がとても面白かったのでした。
その流れで、RxJava
のコードでもスレッドのインスタンスの生成とスレッドの起動をしている箇所があるはずなのだから、試しに少し追って見ようか、とRxJava
でsubscribeOn
スレッドを作成して起動させている部分を眺めてみました。
本当は他の部分も読んでいたのですが、うまくまとめきれず subscribeOn
の部分だけで大変な分量になってしまったため、本記事ではsubscribeOn
まわりだけで勘弁させていただきます。
また、書いている人は、処理が行われるスレッドが生成、起動(、停止)されるタイミングを大まかに知りたかっただけなので、あらかじめご了承ください。
詳しい人は、教えてください...。
まとめ
Observable.subscribeOn
メソッドlift
メソッドでOperatorSubscribeOn
オペレータで元のObservable
に何かするみたいです
OperatorSubscribeOn
クラスworker.schedule()
でよびだされるAction0#call
メソッド内で、OperatorSubscribeOn#call()
メソッドで生成しているsubscriber
が受け取っているObservable
のunsafeSubscribe
が実行されていますよunsafeSubscribe
の中でOperatorSubscribeOn
のフィールドのsubscriber
(=元を辿れば.subscribeOn
を呼び出したObservable
の)のonNext()
が行われてますよunsafeSubscribe
の中に、OperatorSubscribeOn#call()
メソッドの引数として与えられているsubscriber
が持ち込まれているっぽいですよ
では、以下に長々とコードを眺めていきます。
Observable.subscribeOnメソッド
まず、Observable.subscribeOn
のコードを探したらこちらでした。
public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return nest().lift(new OperatorSubscribeOn<T>(scheduler)); }
今回追いたい*1のは非同期処理の場合なので、おそらく
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
こちらだと思います*2。
lift()
ではまず、与えたOperator
を使ってsubscriber
を作成します。
そして、そのsubscriber
を引数に、元のObservable
のOnSubscribe#call
メソッドを呼び出すという感じの中身のObservable
を生成して返す、
つまり引数に与えられたOperator
を使ってちょいと衣を足した新しいObservable
を返す、といったことをしています*3。
OperatorSubscribeOnクラス
それをふまえて、OperatorSubscribeOn
を覗いてみます。まず、コンストラクタから。
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> { private final Scheduler scheduler; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; }
scheduler
をprivate
変数に代入しているみたいですね。
では次に、subscriber
を作成しているcall()
メソッドの方を見てみます。
@Override public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { @Override public void onCompleted() { // ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (Thread.currentThread() == t) { // don't schedule if we're already on the thread (primarily for first setProducer call) // see unit test 'testSetProducerSynchronousRequest' for more context on this producer.request(n); } else { inner.schedule(new Action0() { @Override public void call() { producer.request(n); } }); } } }); } }); } }); } }; } }
.........。
長いですね。
気になるところがたくさんありますが、事前に少し読んだところ、まさにここでスレッド立ててますよ、という意味ではっきり関係あると理解できたのは、 下記の2カ所です。
final Worker inner = scheduler.createWorker();
@Override public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) {
少ないですね*4。スレッドの生成と起動のタイミングを大雑把に把握さえできれば今回の目的はそれで良いのです。
scheduler.createWorker()によるワーカーの作成
final Worker inner = scheduler.createWorker();
では与えられたscheduler
に対応しているWorker
を作っていて、subscribeOn
に関して、Threadの生成に近いことが行われている、と思っているのがここです。
大体Schedulers.newThread()
を指定している場合は、下記のクラスのインスタンスが生成されます。
public final class NewThreadScheduler extends Scheduler { // ... @Override public Worker createWorker() { return new NewThreadWorker(THREAD_FACTORY); }
このNewThreadWorker
のコンストラクタ内でScheduledExecutorService
クラス(スレッド(Runnable)の処理を実行する働きのクラス)のインスタンスが作成されます。
public class NewThreadWorker extends Scheduler.Worker implements Subscription { // ... public NewThreadWorker(ThreadFactory threadFactory) { ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); // スレッドを起動させるためのインスタンス GETだぜ! // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak boolean cancelSupported = tryEnableCancelPolicy(exec); if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) { registerExecutor((ScheduledThreadPoolExecutor)exec); } schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook(); executor = exec; }
worker.schedule()による別スレッドでの実行命令
次に、inner.schedule(Action0 ...
の方を見てみます。
そういえば深くは追っていませんが、Action..というのはsubscribe
メソッドに渡される処理内容を表すクラスみたいです。
さて、このinner(NewThreadWorker).schedule()
メソッドではThreadの起動を行っています。
NewThreadWorker
クラスに戻りましょう。
// NewThreadWorker @Override public Subscription schedule(final Action0 action) { return schedule(action, 0, null); } // もう1個オーバロードしたメソッドをはさみつつ @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { if (isUnsubscribed) { return Subscriptions.unsubscribed(); } return scheduleActual(action, delayTime, unit); } public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = schedulersHook.onSchedule(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
scheduleActual
メソッドの中で、executor(スレッドを管理する働きのクラス)
のsubmit()
メソッドやschedule()
メソッドが呼ばれています*5。
schdule()
メソッドは指定された時間後に、submit()
は遅延時間0でタスク(run = いくつか設定等が追加されたactionの内容)を実行するといった感じの関数です*6。
なので、ここで別のスレッドで処理をおっぱじめるぞ、と書いているみたいですね。
worker.schedule()でよびだされる Action0 callメソッド内でObservableのunsafeSubscribe実行
さて、inner.schedule(Action0 ...
の行で別のスレッドに処理を投げて実行させる命令が走っていることがわかったので、今度はその内容として与えられている、Action0
の方を見てみます。
inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void setProducer(final Producer producer) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (Thread.currentThread() == t) { // don't schedule if we're already on the thread (primarily for first setProducer call) // see unit test 'testSetProducerSynchronousRequest' for more context on this producer.request(n); } else { inner.schedule(new Action0() { @Override public void call() { producer.request(n); } }); } } }); } }); } });
たぶん、schedule
メソッドでは、Action0
のcall
メソッドが実行されるのでしょう。
Action0
のcall
メソッドの中で注目したいのが、まずここです。
public void onNext(final Observable<T> o) { inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); o.unsafeSubscribe(new Subscriber<T>(subscriber) {
Observable
のunsafeSubscribe
メソッドをschedule
メソッドの引数のAction0()
のcall()
メソッドの中で呼びだしています。
ということは、これはshcdule
メソッドを呼び出しているインスタンスのスレッドで、このObservable
の処理が開始されるのでしょうか。
そんな感じで別スレッドで行われていそうな、unsafeSubscribe
メソッドの中で生成されているsubscriber
のonNext
メソッドの中を見てみます。
subscriber.onNext
を呼び出しています。
@Override public void onNext(T t) { subscriber.onNext(t); }
ここの、subscriber
はOperatorSubscribeOn#call()
で作成しているSubscriber
のフィールドのsubscriber
であり、ということはsubscribeOn
メソッドの呼び出し元で作っているOnSubscribe
からcall()
メソッドの引数として与えられたsubscriber
みたいです。
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); return new Subscriber<Observable<T>>(subscriber) { // Observable#liftメソッドより。上のsubscriber = 下記のo. Subscriber<? super T> st = hook.onLift(operator).call(o); // hook.onLift(operator) = operator. hookはデフォルトだと何もしない
ということは、このsubscriber
は呼び出し元のsubscriber
の値を受け取って動くOnSubscribe
のcall()
の引数です。
変換したり、関数の引数の形で色々して別スレッドで行う関数の中までsubscriber
とsubscriber
が持っているObservable
が放出された値を持ってきているっぽいですね。
最後にObservable.lift
メソッドを見て確認しておきます。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return new Observable<R>(new OnSubscribe<R>() { @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = hook.onLift(operator).call(o); // hook.onLift(operator) = operator. hookはデフォルトだと何もしない try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); onSubscribe.call(st); // Operatorで新しく作ったsubscriberがcallメソッドに渡される } catch (Throwable e) { // localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then // prevents onErrorResumeNext and other similar approaches to error handling Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); // if the lift function failed all we can do is pass the error to the final Subscriber // as we don't have the operator available to us o.onError(e); } } }); }
長くなりましたね...。もう一度になりますが、こちらにもまとめを載せさせてください。
まとめ
Observable.subscribeOn
メソッドlift
メソッドでOperatorSubscribeOn
オペレータで元のObservable
に何かするみたいです
OperatorSubscribeOn
クラスworker.schedule()
でよびだされるAction0#call
メソッド内で、OperatorSubscribeOn#call()
メソッドで生成しているsubscriber
が受け取っているObservable
のunsafeSubscribe
が実行されていますよunsafeSubscribe
の中でOperatorSubscribeOn
のフィールドのsubscriber
(=元を辿れば.subscribeOn
を呼び出したObservable
の)のonNext()
が行われてますよunsafeSubscribe
の中に、OperatorSubscribeOn#call()
メソッドの引数として与えられているsubscriber
が持ち込まれているっぽいですよ
ところで、上記をふまえると、
OnSubscribe
に渡されるsubscriber
のonNext()
メソッドはsubscribe()
メソッドを呼び出した時点から実行され始めるので、onSubscribe()
メソッドで指定されたスレッドは、subscribe()
メソッドが呼ばれた時点で生成されていそう。- また、起動も
subscribe()
メソッドでObservable
が値を放出し始めたタイミングでschedule()
メソッドが呼ばれて実行されていそう
という感じがしました。
実は、自分自身はPresenter
にObservable
の処理を書いて、Activity
などからPresenter
のメソッドで作成したObservable
を呼び出してsubscribe
する書き方が癖なのですが、そうすると画面が表示されているタイミングと処理を行うスレッドが生成、(unsubscribe()メソッド等で)停止されるタイミングは画面と密に関わってますね。
画面とは関係無しに1つのスレッドに処理を投げて、その特定のスレッドから結果や進捗を受け取りたい、ということがしたい場合にはIntentService
を使った方がいいのかな...勉強だー。
*1:追いきれるとは限らない
*2:もう少しきちんと確かめられれば良いのですが...
*3: もう少し綺麗な言い方だとストリームを変換していると言うのだと思います。 OperatorMap(mapメソッドで使われる)の場合について http://woshidan.hatenablog.com/entry/2015/12/13/205131
*4:...うっ...
*5:返り値のFuture...と出ているのは、まだ処理が完了していない場合に仮の値を、終わった場合に仮の値と同じように扱える本物の値を返しているというくらいの感じだと思います。非同期処理を引き受けましたよ、いつぐらいに見にきてね的な値を先もって渡すような処理の書き方があり、それをFutureパターンとか、いつぐらいに見にきてねと渡される値を表すクラスをFuture役といったりするそうです
*6:ScheduledActionを見ていると runは、キャンセルされたときにどうするか、といった部分でいくつか設定した処理を持っておく、といった感じに見えますが詳しくは分からないです...