我有一个用例来创建一个新的Observable:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { final RequestFuture<String> futureRequest = RequestFuture.newFuture(); try{ //getResult() is a sync time consuming http connection String response = getResult(); subscriber.onNext( response ); subscriber.onCompleted(); Log.e("call method","Thread is about to end" + Thread.currentThread().getId()); } catch ( Exception e){ subscriber.onError( e ); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());因此,你可以看到getResult()方法是一个http调用,这是一个耗时的,我正在以同步方式进行。 我在io线程上订阅它并在Android的主线程上观察。
当我检索observable并订阅它时:
subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.e("on completed method", "Thread is ending "+ Thread.currentThread().getId()); } @Override public void onError(Throwable e) { Toast.makeText( getContext(), "failed!",Toast.LENGTH_LONG ).show(); } @Override public void onNext(String s) { Log.e("on Next method", "Thread is about to end" + Thread.currentThread().getId()); } });奇怪的是我可以看到call()方法正在一个其ID为372的线程上运行,而订阅者的onNext()方法正在线程1上运行,我想它是来自android的UiThread。
那么实际上Rxjava如何处理这个线程的变化呢? 并且它证明了调用方法中的订阅者不是我在subscribe()方法中使用的那个?
I have a use case for creating a new Observable:
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { final RequestFuture<String> futureRequest = RequestFuture.newFuture(); try{ //getResult() is a sync time consuming http connection String response = getResult(); subscriber.onNext( response ); subscriber.onCompleted(); Log.e("call method","Thread is about to end" + Thread.currentThread().getId()); } catch ( Exception e){ subscriber.onError( e ); } } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());So as you can see getResult() method is a http call which is time consuming and im doing it in sync way. And I subscribe it on io thread and observe on Android's main thread.
and when I retrieve the observable and subscribe to it:
subscribe(new Subscriber<String>() { @Override public void onCompleted() { Log.e("on completed method", "Thread is ending "+ Thread.currentThread().getId()); } @Override public void onError(Throwable e) { Toast.makeText( getContext(), "failed!",Toast.LENGTH_LONG ).show(); } @Override public void onNext(String s) { Log.e("on Next method", "Thread is about to end" + Thread.currentThread().getId()); } });the strange thing is i can see the call() method is running on a thread which its id is 372, and the onNext() method from the subscriber is running on thread 1 , i think its the UiThread from android.
so actually how does Rxjava handle this thread change? and it proves that the subscriber in call method is not the one im using in subscribe() method?
最满意答案
默认情况下,订阅者的onNext将在您调用.subscribe的同一个线程上运行。
在您的代码中,您有两个线程更改:
第一个是subscribeOn(IO) ,它将运行.create和IO中的其余链。 之后你有observeOn(mainThread) ,它将把所有以下操作改为mainThread,包括.subscribe()的最终方法。
希望这能让您更好地理解这两个操作符的工作原理: https : //medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.t6uagyarn
By default subscriber's onNext will run on the same thread you call the .subscribe.
In your code however you have two thread changes:
The first is with the subscribeOn(IO) which will run the .create and the rest of the chain in IO. After that you have observeOn(mainThread) which will change all the following operations to the mainThread, including the final methods in .subscribe().
Hopefully this will give you better understanding of how those two operators work: https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2#.t6uagyarn
更多推荐
发布评论