Manejo de excepciones API en RxJava

Estoy tratando de envolver mi cabeza alrededor de RxJava actualmente, pero estoy teniendo un pequeño problema con el manejo de las excepciones de llamadas de servicio de una manera elegante.

Básicamente, tengo un servicio (Retrofit) que devuelve un Observable<ServiceResponse> . ServiceResponse se define de la siguiente manera:

 public class ServiceResponse { private int status; private String message; private JsonElement data; public JsonElement getData() { return data; } public int getStatus() { return status; } public String getMessage() { return message; } } 

Ahora lo que quiero es mapear esa respuesta genérica a una List<Account> contenida dentro del campo JsonElement de datos (supongo que no importa qué aspecto tenga el objeto Account , así que no contaminaré la publicación con él). El código siguiente funciona muy bien para el caso de éxito, pero no puedo encontrar una forma agradable de manejar mis excepciones API:

 service.getAccounts() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .map(new Func1<ServiceResponse, AccountData>() { @Override public AccountData call(ServiceResponse serviceResponse) { // TODO: ick. fix this. there must be a better way... ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); switch (responseType) { case SUCCESS: Gson gson = new GsonBuilder().create(); return gson.fromJson(serviceResponse.getData(), AccountData.class); case HOST_UNAVAILABLE: throw new HostUnavailableException(serviceResponse.getMessage()); case SUSPENDED_USER: throw new SuspendedUserException(serviceResponse.getMessage()); case SYSTEM_ERROR: case UNKNOWN: default: throw new SystemErrorException(serviceResponse.getMessage()); } } }) .map(new Func1<AccountData, List<Account>>() { @Override public List<Account> call(AccountData accountData) { Gson gson = new GsonBuilder().create(); List<Account> res = new ArrayList<Account>(); for (JsonElement account : accountData.getAccounts()) { res.add(gson.fromJson(account, Account.class)); } return res; } }) .subscribe(accountsRequest); 

¿Hay una mejor manera de hacer esto? Esto funciona, onError disparará a mi observador, y voy a recibir el error que he lanzado, pero definitivamente no parece que estoy haciendo esto bien.

¡Gracias por adelantado!

Editar:

Permítanme aclarar exactamente lo que quiero lograr:

Quiero tener una clase que se puede llamar desde la interfaz de usuario (por ejemplo, una actividad, o fragmento, o lo que sea). Esa clase tomaría un Observer<List<Account>> como un parámetro así:

 public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) { ... } 

Ese método devolvería una suscripción que se puede cancelar cuando la interfaz de usuario se separa / destruye / etc.

El observador parametrizado manejaría enNext para las respuestas exitosas que pasan en una lista de Cuentas. OnError manejaría cualquier excepción, pero también recibiría cualquier excepción API (por ejemplo, si el estado de respuesta! = 200 crearíamos un Throwable y lo pasaríamos a onError). Idealmente, no quiero simplemente "lanzar" la Excepción, quiero pasarla directamente al Observador. Eso es lo que todos los ejemplos que veo hacer.

La complicación es que mi servicio Retrofit devuelve un objeto ServiceResponse , por lo que mi observador no puede suscribirse a eso. Lo mejor que he inventado es crear una envoltura Observer alrededor de mi observador, así:

 @Singleton public class AccountsDatabase { private AccountsService service; private List<Account> accountsCache = null; private PublishSubject<ServiceResponse> accountsRequest = null; @Inject public AccountsDatabase(AccountsService service) { this.service = service; } public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) { ObserverWrapper observerWrapper = new ObserverWrapper(observer); if (accountsCache != null) { // We have a cached value. Emit it immediately. observer.onNext(accountsCache); } if (accountsRequest != null) { // There's an in-flight network request for this section already. Join it. return accountsRequest.subscribe(observerWrapper); } if (accountsCache != null && !forceRefresh) { // We had a cached value and don't want to force a refresh on the data. Just // return an empty subscription observer.onCompleted(); return Subscriptions.empty(); } accountsRequest = PublishSubject.create(); accountsRequest.subscribe(new ObserverWrapper(new EndObserver<List<Account>>() { @Override public void onNext(List<Account> accounts) { accountsCache = accounts; } @Override public void onEnd() { accountsRequest = null; } })); Subscription subscription = accountsRequest.subscribe(observerWrapper); service.getAccounts() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); return subscription; } static class ObserverWrapper implements Observer<ServiceResponse> { private Observer<List<Account>> observer; public ObserverWrapper(Observer<List<Account>> observer) { this.observer = observer; } @Override public void onCompleted() { observer.onCompleted(); } @Override public void onError(Throwable e) { observer.onError(e); } @Override public void onNext(ServiceResponse serviceResponse) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); switch (responseType) { case SUCCESS: Gson gson = new GsonBuilder().create(); AccountData accountData = gson.fromJson(serviceResponse.getData(), AccountData.class); List<Account> res = new ArrayList<>(); for (JsonElement account : accountData.getAccounts()) { res.add(gson.fromJson(account, Account.class)); } observer.onNext(res); observer.onCompleted(); break; default: observer.onError(new ApiException(serviceResponse.getMessage(), responseType)); break; } } } } 

Todavía siento que no estoy usando esto correctamente sin embargo. Definitivamente no he visto a nadie más usando un ObserverWrapper antes. Tal vez no debería estar usando RxJava, aunque los chicos de SoundCloud y Netflix realmente me vendieron en ella en sus presentaciones y estoy muy ansioso por aprenderlo.

Por favor, lea a continuación He añadido una edición.

Es perfectamente correcto tirar dentro de un Action / Func / Observer con RxJava. La excepción se propagará por el marco hasta el observador. Si se limita a llamar a onError sólo entonces se retorcer a ti mismo para que eso suceda.

Con eso se dice una sugerencia sería simplemente eliminar este envoltorio y añadir una acción de validación simple dentro de la cadena service.getAccount … de Observables.

Utilizaría el doOnNext (nuevo ValidateServiceResponseOrThrow) encadenado con un mapa (nuevo MapValidResponseToAccountList). Esas son clases simples que implementa el código necesario para mantener la cadena de Observable un poco más legible.

Aquí está su método de loadAccount simplificado usando lo que sugerí.

 public Subscription loadAccounts(Observer<List<Account>> observer, boolean forceRefresh) { if (accountsCache != null) { // We have a cached value. Emit it immediately. observer.onNext(accountsCache); } if (accountsRequest != null) { // There's an in-flight network request for this section already. Join it. return accountsRequest.subscribe(observer); } if (accountsCache != null && !forceRefresh) { // We had a cached value and don't want to force a refresh on the data. Just // return an empty subscription observer.onCompleted(); return Subscriptions.empty(); } accountsRequest = PublishSubject.create(); accountsRequest.subscribe(new EndObserver<List<Account>>() { @Override public void onNext(List<Account> accounts) { accountsCache = accounts; } @Override public void onEnd() { accountsRequest = null; } }); Subscription subscription = accountsRequest.subscribe(observer); service.getAccounts() .doOnNext(new ValidateServiceResponseOrThrow()) .map(new MapValidResponseToAccountList()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); return subscription; } private static class ValidateResponseOrThrow implements Action1<ServiceResponse> { @Override public void call(ServiceResponse response) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); if (responseType != SUCCESS) throw new ApiException(serviceResponse.getMessage(), responseType)); } } private static class MapValidResponseToAccountList implements Func1<ServiceResponse, List<Account>> { @Override public Message call(ServiceResponse response) { // add code here to map the ServiceResponse into the List<Accounts> as you've provided already } } 

Editar: A menos que alguien diga lo contrario, creo que es la mejor práctica devolver errores usando flatMap. He lanzado Excepciones de Acción en el pasado, pero no creo que sea la forma recomendada.

Usted tendrá una pila de excepción de limpieza si utiliza flatMap. Si tira de dentro de una acción, la pila Exception contendrá realmente rx.exceptions.OnErrorThrowable$OnNextValue Excepción que no es ideal.

Permítanme demostrar el ejemplo anterior usando el flatMap en su lugar.

 private static class ValidateServiceResponse implements rx.functions.Func1<ServiceResponse, Observable<ServiceResponse>> { @Override public Observable<ServiceResponse> call(ServiceResponse response) { ResponseTypes responseType = ResponseTypes.from(serviceResponse.getStatus()); if (responseType != SUCCESS) return Observable.error(new ApiException(serviceResponse.getMessage(), responseType)); return Observable.just(response); } } service.getAccounts() .flatMap(new ValidateServiceResponse()) .map(new MapValidResponseToAccountList()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(accountsRequest); 

Como puedes ver, la diferencia es sutil. El ValidateServiceResponse ahora implementa el Func1 lugar de Action1 y ya no estamos usando la palabra clave throw . Utilizamos Observable.error(new Throwable) lugar. Creo que esto encaja mejor con el contrato Rx esperado.

Usted podría leer este buen artículo sobre el manejo de errores http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/

  • Progreso de descarga con RxJava, OkHttp y Okio en Android
  • RxJava - subir archivos secuencialmente - emitir el siguiente elemento, cuando onNext llamado
  • En RxJava, cómo reintentar / reanudar en caso de error, en lugar de completar el observable
  • Cómo cancelar la solicitud con retofit2 y RxAndroid
  • Unidad de prueba de la aplicación android con retrofit y rxjava
  • RxJava: encadenamiento de observables
  • rxjava interval () tiempo de reinicio después de algún evento
  • RxJava file.createNewFile () devuelve siempre TRUE
  • CalledFromWrongThreadException incluso cuando se utiliza AndroidSchedulers.mainThread ()
  • Suscribir un cajón de navegación a un Observable
  • Android Pros y Contras: Event Bus y RxJava
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.