Cómo hacer que flatMap se ejecute en un subproceso de fondo

Estoy utilizando Retrofit y RxJava para realizar algunas tareas de fondo. El código se ve así:

public class MyLoader{ public Observable<MyData> getMyData(){ return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() { @Override public Observable<MyData> call(MyHelper myHelper) { return queryData(myHelper); } }); } private Observable<MyData> queryData(MyHelper myHelper){ ... } private Observable<MyHelper> setupHelper(){ return Observable.create(new Observable.OnSubscribe<MyHelper>() { @Override public void call(final Subscriber<? super MyHelper> subscriber) { try{ MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data subscriber.onNext(helper); subscriber.onCompleted(); }catch(RetrofitError e){ subscriber.onError(e) } } } } } 

Esto falla con RetrofitError, debido a la excepción de NetworkOnMainThread en esta línea:

  MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data 

Suscribirse a mi Observable:

 myLoader.getMyData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<MyData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MyData inventory) { } }); 

De acuerdo con la documentación de Rx, flatMap no funciona en ningún hilo de fondo. Mi pregunta es cómo puedo asegurar que todo el método getMyData() se ejecute en segundo plano.

Acabo de agregar observeOn(Schedulers.newThread()) antes de flatMap y funciona!

Esto se mueve sólo un paso en la tubería para el hilo de fondo:

 Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val)); 

Respuesta original aquí: https://stackoverflow.com/a/35429084/2908525

Hay una buena probabilidad cuando se crea el objeto MyLoader en el subproceso principal que el Observable.create se ejecute también (o tal vez en algún otro lugar antes en su código (?)). Si es así, el .subscribeOn(Schedulers.io()) no tendrá ningún efecto en cambiar el hilo.

Puede intentar envolver el .create () con un .defer() para asegurarse de que el observable se crea sólo cuando se suscribe.

Por ejemplo, defer(() -> create(....))

  • Observable vs Flujo rxJava2
  • RxJava2 función de rebote no funciona correctamente en RecyclerView - Android
  • Progreso de descarga con RxJava, OkHttp y Okio en Android
  • RxJava- validación de formulario RxAndroid en dynamic EditText
  • ¿Cómo esperar múltiples llamadas asincrales anidadas usando RxJava-Android?
  • RxJava se vuelve a suscribir al evento tras la restauración de la actividad
  • Suscribir un cajón de navegación a un Observable
  • ¿Cómo crear un Observable en Android?
  • RxJava / RxAndroid - maneja varios cambios de EditText
  • Android RxJava, sin bloqueo?
  • Solo observable con varios suscriptores
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.