Encadenamiento de observables basado en resultados

Soy un principiante completo en rx-java y rx-android. He oído que la curva de aprendizaje es bastante empinada en el principio.

Im que intenta substituir todo el código basado en Eventbus a una alternativa más tipográfica usando rx-android.

He configurado este fragmento para crear observables desde editar texto cambios de texto:

Actividad principal

RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).subscribe(new Action1<EditText>() { @Override public void call(EditText editText) { searchStopResultFragment.query(editText.getText().toString()); } }); 

RxUtils:

 public static Observable<EditText> createEditTextChangeObservable(final EditText editText){ return Observable.create(new Observable.OnSubscribe<EditText>() { @Override public void call(final Subscriber<? super EditText> subscriber) { editText.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { } @Override public void afterTextChanged(Editable s) { if (subscriber.isUnsubscribed()) return; subscriber.onNext(editText); } }); } }); } 

SearchStopResultFragment:

 public void query(String query){ lastQuery = query; resultObservable = StopProvider.getStopResultObservable(getActivity().getContentResolver(),query); subscription = resultObservable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Stop>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<Stop> stops) { if(!lastQuery.equals("")) { if(stops.size()>0) { ArrayList<AdapterItem> items = adapter.getItems(); items.clear(); for (Stop stop : stops) { SearchResultStopItem item = new SearchResultStopItem(stop, SearchResultStopItem.STOP); items.add(item); } adapter.setItems(items); adapter.notifyDataSetChanged(); }else{ //DO A NOTHER ASYNC QUERY TO FETCH RESULTS } }else{ showStartItems(); } } }); } 

Se siente como que estoy haciendo esto mal. Creo nuevos observables del método de consulta en mi fragmento en cada evento de cambio de texto. También quiero crear una nueva operación de búsqueda asíncrona basada en el resultado en StopProvider.getStopResultObservable (ver el comentario)

¿Alguna cosa?

Aquí es lo que me ocurrió:

 RxUtils.createEditTextChangeObservable(txtInput) .throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(EXTRACT_STRING) .filter(STRING_IS_NOT_EMPTY) .concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(final String query) { return StopProvider.getStopResultObservable(getContentResolver(), query) .map(new Func1<List<Stop>, Pair<String, List<Stop>>>() { // I think this map is a bit more readable than the // combineLatest, and since "query" should not be changing // anyway, the result should be the same (you have to // declare it as final in the method signature, though @Override public Pair<String, List<Stop>> call(List<Stop> stops) { return new Pair(query, stops); } }); } ) .concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() { @Override public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first) .map(new Func1<LocationNameResponse, List<Stop>>() { @Override public List<Stop> call(LocationNameResponse locationNameResponse) { // since there was no if-else in your original code (you were always // just wrapping the List in an Observable) I removed that, too return locationNameResponse.getAddresses(); } }); } else { return Observable.just(queryAndStops.second); } } ) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .compose(this.<List<Stop>>bindToLifecycle()) .subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { // since I don't know what your API is returning I think // it's saver to keep this check in: if (stops != null) { searchStopResultFragment.showStops(stops); } else { searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

dónde:

 public static final Func1<EditText, String> EXTRACT_STRING = new Func1<EditText, String>() { @Override public void String call(EditText editText) { return editText.getText().toString(); } }; public static final Func1<String, Boolean> STRING_IS_NOT_EMPTY = new Func1<String, Boolean>() { @Override public void String call(String string) { return !string.isEmpty(); } }; 

Por lo tanto, esto al menos elimina la necesidad de devolver Observable.just(null) y luego comprobar que en la cadena.

Puede mover su segundo concatMap al único lugar donde lo necesite – después de combinarLatest

  RxUtils.createEditTextChangeObservable(txtInput) .throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .concatMap(new Func1<EditText, Observable<Pair<String, List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(EditText editText) { String query = editText.getText().toString(); //searchStopResultFragment.setLastQuery(query); if (query.isEmpty()) { return Observable.just(null); } return Observable .combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() { @Override public Pair<String, List<Stop>> call(List<Stop> stops, String s) { return new Pair(s, stops); } }) .concatMap(new Func1<R, Observable<? extends Pair<String, List<Stop>>>>() { @Override public Observable<? extends Pair<String, List<Stop>>> call(R r) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() { @Override public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) { return Observable.just(locationNameResponse.getAddresses()); } }); } else { return Observable.just(queryAndStops.second); } } }); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()) .subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { if (stops != null) { searchStopResultFragment.showStops(stops); } else { searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

Resuelto con concatmap y combinar lo último:

  RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(EditText editText) { String query = editText.getText().toString(); //searchStopResultFragment.setLastQuery(query); if(query.isEmpty()){ return Observable.just(null); } return Observable.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() { @Override public Pair<String, List<Stop>> call(List<Stop> stops, String s) { return new Pair(s,stops); } }); } }).concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() { @Override public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) { if(queryAndStops!=null) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() { @Override public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) { return Observable.just(locationNameResponse.getAddresses()); } }); } else { return Observable.just(queryAndStops.second); } } return Observable.just(null); } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()).subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { if (stops != null) { searchStopResultFragment.showStops(stops); }else{ searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

Sin embargo, hay alguna forma más agradable para salir de la cadena sin enviar Observable.just (nulo) y comprobar si hay nulos en la próxima llamada?

  • Cómo cancelar la solicitud con retofit2 y RxAndroid
  • Android retofit + rxjava cómo procesar el parámetro de petición dinámica con repeatwhen
  • Prueba de Android Realm con RxJava - "abierto desde un hilo sin un Looper" Excepción
  • Android Studio que agrega la biblioteca de rxjava
  • RxJava: Cómo convertir Lista de objetos en Lista de otros objetos
  • Cómo manejar el error auth0 403 sin agregar código específico en todas partes (Retrofit / okhttp / RxAndroid)
  • Implementar reintento Cuando la lógica
  • Cancelar la suscripción de un rx.Single en RxJava
  • RxJava Observable a Completable, cómo evitar toBlocking ()
  • Flatten Observable <Observable <Cursor >> Observable <Cursor>
  • RxAndroid y Retrofit: No se puede crear el adaptador de llamada para io.reactivex.Observable <retrofit2.Response <okhttp3.ResponseBody >>
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.