Cómo hacer que flatMap se ejecute en un subproceso de fondo
Estoy utilizando Retrofit y RxJava para realizar algunas tareas de fondo. El código se ve así:
public class MyLoader{ public Observable<MyData> getMyData(){ return setupHelper().flatMap(new Func1<MyHelper, Observable<MyData>>() { @Override public Observable<MyData> call(MyHelper myHelper) { return queryData(myHelper); } }); } private Observable<MyData> queryData(MyHelper myHelper){ ... } private Observable<MyHelper> setupHelper(){ return Observable.create(new Observable.OnSubscribe<MyHelper>() { @Override public void call(final Subscriber<? super MyHelper> subscriber) { try{ MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data subscriber.onNext(helper); subscriber.onCompleted(); }catch(RetrofitError e){ subscriber.onError(e) } } } } }
Esto falla con RetrofitError, debido a la excepción de NetworkOnMainThread
en esta línea:
- Uso de "skipWhile" combinado con "repeatWhen" en RxJava para implementar el sondeo de servidor
- RxAndroid: cambios de interfaz de usuario en el subproceso Schedulers.io ()
- CalledFromWrongThreadException incluso cuando se utiliza AndroidSchedulers.mainThread ()
- RxJava onCompleted y onTerminate en el hilo principal
- Solicitudes HTTP periódicas con RxJava y Retrofit
MyHelper helper = makeRetrofitCall();//Using Retrofit blocking call to get some data
Suscribirse a mi Observable:
myLoader.getMyData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<MyData>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(MyData inventory) { } });
De acuerdo con la documentación de Rx, flatMap
no funciona en ningún hilo de fondo. Mi pregunta es cómo puedo asegurar que todo el método getMyData()
se ejecute en segundo plano.
- Inter fragmento de comunicación utilizando rxjava
- Usando RxJava para encadenar una serie de operaciones - ¿a dónde ir después?
- Cómo manejar el `IllegalArgumentException` en RxJava?
- ¿Qué hilo es unSubscribeOn llamado? ¿Deberíamos llamarlo?
- Cómo cancelar la solicitud con retofit2 y RxAndroid
- Android Rxjava suscribirse a un cambio de variable
- ¿Cómo reconstruir o restablecer caché Observable, utilizado con Retrofit para obtener nuevos datos?
- Cancelación de varias suscripciones a la vez en RxAndroid - Android
Acabo de agregar observeOn(Schedulers.newThread())
antes de flatMap
y funciona!
Esto se mueve sólo un paso en la tubería para el hilo de fondo:
Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));
Respuesta original aquí: https://stackoverflow.com/a/35429084/2908525
Hay una buena probabilidad cuando se crea el objeto MyLoader en el subproceso principal que el Observable.create se ejecute también (o tal vez en algún otro lugar antes en su código (?)). Si es así, el .subscribeOn(Schedulers.io())
no tendrá ningún efecto en cambiar el hilo.
Puede intentar envolver el .create () con un .defer()
para asegurarse de que el observable se crea sólo cuando se suscribe.
Por ejemplo, defer(() -> create(....))
- ¿Cómo puedo editar el código fuente de android os teclado?
- Cómo agregar la función de pausa y reanudar mientras recodifica vídeo en android