Encadenar dos observables de adaptación con RxJava

Quiero ejecutar 2 llamadas de red una tras otra. Ambas llamadas de red devuelven Observable. La segunda llamada utiliza datos del resultado exitoso de la primera llamada, método en el resultado exitoso de la segunda llamada usa datos de ambos resultados satisfactorios de la primera y de la segunda llamada. También debería ser capaz de manejar ambos eventos onError "diferente". ¿Cómo puedo lograr este evitar el infierno de devolución de llamada como en el ejemplo a continuación:

API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<AuthResponse>() { @Override public void call(final AuthResponse authResponse) { API().getUser(authResponse.getAccessToken()) .subscribe(new Action1<List<User>>() { @Override public void call(List<User> users) { doSomething(authResponse, users); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { onErrorGetUser(); } }); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { onErrorAuth(); } }); 

Sé acerca de zip, pero quiero evitar la creación de "clase Combiner".

Actualización 1. Intentó implementar la respuesta de akarnokd:

  API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(authResponse -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> { getView().setError(processFail(throwable)); }), ((authResponse, users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } else { getView().setError(R.string.something_went_wrong); } })); 

Sin embargo dentro del compilador del método flatMap dice que no puede resolver los métodos de authResponse y los usuarios ( authResponse.getAccessToken() , users.get(0) etc). Im nuevo a la programación de rx y lambdas – por favor dime cuál es el problema. De todas formas el código parece mucho más limpio ahora.

Actualización 2.

 API() .auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .doOnError(throwable -> getView().setError(processFail(throwable))) .flatMap((AuthResponse authResponse) -> API() .getUser(authResponse.getAccessToken()) .doOnError(throwable -> getView().setError(processFail(throwable))), ((AuthResponse authResponse, List<User> users) -> { // Ensure returned user is the which was authenticated if (authResponse.getUserId().equals(users.get(0).getId())) { SessionManager.getInstance().initSession(email, password, authResponse.getAccessToken(), users.get(0)); getView().toNews(); } return Observable.just(this); })); 

Lo he hecho así, pero ahora mis llamadas de red no se ejecutan en absoluto.

Además de la respuesta de Anthony R., hay una sobrecarga flatMap que toma un Func2 y empareja sus valores primarios y aplastados para usted. Además, observe los operadores onErrorXXX y onExceptionXXX para la manipulación de errores y enciéndalos con su primer y segundo Observables

 first.onErrorReturn(1) .flatMap(v -> service(v).onErrorReturn(2), (a, b) -> a + b); 

¿Has mirado en flatMap ()? Si su aversión a él (o zip ()) es la necesidad de hacer una clase innecesaria para sostener dos objetos, android.util.Pair puede ser una respuesta. No estoy seguro de cómo obtener exactamente el manejo de errores que está buscando, sin embargo.

  API().auth(email, password) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<AuthResponse, Observable<List<User>>>() { @Override public Observable<List<User>> call(AuthResponse authResponse) { return API().getUser(authResponse.getAccessToken()); } }, new Func2<AuthResponse, List<User>, Pair<AuthResponse, List<User>>>() { @Override public Pair<AuthResponse, List<User>> call(AuthResponse authResponse, List<User> users) { return new Pair<>(authResponse, users); } }).subscribe(new Action1<Pair<AuthResponse, List<User>>>() { @Override public void call(Pair<AuthResponse, List<User>> pair) { doSomething(pair.first, pair.second); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { // not sure how to tell which one threw the error } }); 
  • RxJava: "java.lang.IllegalStateException: Sólo un suscriptor permitido!"
  • ¿Cómo utilizar Rx para gestionar múltiples observadores con el fin de mantener una sola conexión abierta a un servicio?
  • Si se realiza la llamada onComplete para un Asunto RxJava, ¿tengo que anular la suscripción manualmente de nuevo?
  • RxJava como autobús de eventos?
  • Cuándo cancelar una suscripción
  • Comprobación de Internet, dónde colocar al usar MVP, RX y Retrofit
  • Manejar paginación con RxJava
  • ¿Tenemos alguna posibilidad de detener la solicitud en OkHttp Interceptor?
  • Utilizar la programación con RxAndroid
  • Rxandroid pide que se ejecute en el hilo ui aunque esté suscrito en AndroidSchedulers.mainThread ()
  • RxJava dividir uno Observable a dos subObservables
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.