Encadenamiento de observables basado en resultados

Soy un principiante completo en rx-java y rx-android. He oído que la curva de aprendizaje es bastante empinada en el principio.

Im que intenta substituir todo el código basado en Eventbus a una alternativa más tipográfica usando rx-android.

He configurado este fragmento para crear observables desde editar texto cambios de texto:

Actividad principal

RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).subscribe(new Action1<EditText>() { @Override public void call(EditText editText) { searchStopResultFragment.query(editText.getText().toString()); } }); 

RxUtils:

 public static Observable<EditText> createEditTextChangeObservable(final EditText editText){ return Observable.create(new Observable.OnSubscribe<EditText>() { @Override public void call(final Subscriber<? super EditText> subscriber) { editText.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { } @Override public void afterTextChanged(Editable s) { if (subscriber.isUnsubscribed()) return; subscriber.onNext(editText); } }); } }); } 

SearchStopResultFragment:

 public void query(String query){ lastQuery = query; resultObservable = StopProvider.getStopResultObservable(getActivity().getContentResolver(),query); subscription = resultObservable.subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<List<Stop>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<Stop> stops) { if(!lastQuery.equals("")) { if(stops.size()>0) { ArrayList<AdapterItem> items = adapter.getItems(); items.clear(); for (Stop stop : stops) { SearchResultStopItem item = new SearchResultStopItem(stop, SearchResultStopItem.STOP); items.add(item); } adapter.setItems(items); adapter.notifyDataSetChanged(); }else{ //DO A NOTHER ASYNC QUERY TO FETCH RESULTS } }else{ showStartItems(); } } }); } 

Se siente como que estoy haciendo esto mal. Creo nuevos observables del método de consulta en mi fragmento en cada evento de cambio de texto. También quiero crear una nueva operación de búsqueda asíncrona basada en el resultado en StopProvider.getStopResultObservable (ver el comentario)

¿Alguna cosa?

3 Solutions collect form web for “Encadenamiento de observables basado en resultados”

Aquí es lo que me ocurrió:

 RxUtils.createEditTextChangeObservable(txtInput) .throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .map(EXTRACT_STRING) .filter(STRING_IS_NOT_EMPTY) .concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(final String query) { return StopProvider.getStopResultObservable(getContentResolver(), query) .map(new Func1<List<Stop>, Pair<String, List<Stop>>>() { // I think this map is a bit more readable than the // combineLatest, and since "query" should not be changing // anyway, the result should be the same (you have to // declare it as final in the method signature, though @Override public Pair<String, List<Stop>> call(List<Stop> stops) { return new Pair(query, stops); } }); } ) .concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() { @Override public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first) .map(new Func1<LocationNameResponse, List<Stop>>() { @Override public List<Stop> call(LocationNameResponse locationNameResponse) { // since there was no if-else in your original code (you were always // just wrapping the List in an Observable) I removed that, too return locationNameResponse.getAddresses(); } }); } else { return Observable.just(queryAndStops.second); } } ) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .compose(this.<List<Stop>>bindToLifecycle()) .subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { // since I don't know what your API is returning I think // it's saver to keep this check in: if (stops != null) { searchStopResultFragment.showStops(stops); } else { searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

dónde:

 public static final Func1<EditText, String> EXTRACT_STRING = new Func1<EditText, String>() { @Override public void String call(EditText editText) { return editText.getText().toString(); } }; public static final Func1<String, Boolean> STRING_IS_NOT_EMPTY = new Func1<String, Boolean>() { @Override public void String call(String string) { return !string.isEmpty(); } }; 

Por lo tanto, esto al menos elimina la necesidad de devolver Observable.just(null) y luego comprobar que en la cadena.

Puede mover su segundo concatMap al único lugar donde lo necesite – después de combinarLatest

  RxUtils.createEditTextChangeObservable(txtInput) .throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) .concatMap(new Func1<EditText, Observable<Pair<String, List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(EditText editText) { String query = editText.getText().toString(); //searchStopResultFragment.setLastQuery(query); if (query.isEmpty()) { return Observable.just(null); } return Observable .combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() { @Override public Pair<String, List<Stop>> call(List<Stop> stops, String s) { return new Pair(s, stops); } }) .concatMap(new Func1<R, Observable<? extends Pair<String, List<Stop>>>>() { @Override public Observable<? extends Pair<String, List<Stop>>> call(R r) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() { @Override public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) { return Observable.just(locationNameResponse.getAddresses()); } }); } else { return Observable.just(queryAndStops.second); } } }); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()) .subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { if (stops != null) { searchStopResultFragment.showStops(stops); } else { searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

Resuelto con concatmap y combinar lo último:

  RxUtils.createEditTextChangeObservable(txtInput).throttleLast(200, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()).concatMap(new Func1<EditText, Observable<Pair<String,List<Stop>>>>() { @Override public Observable<Pair<String, List<Stop>>> call(EditText editText) { String query = editText.getText().toString(); //searchStopResultFragment.setLastQuery(query); if(query.isEmpty()){ return Observable.just(null); } return Observable.combineLatest(StopProvider.getStopResultObservable(getContentResolver(), query), Observable.just(query), new Func2<List<Stop>, String, Pair<String, List<Stop>>>() { @Override public Pair<String, List<Stop>> call(List<Stop> stops, String s) { return new Pair(s,stops); } }); } }).concatMap(new Func1<Pair<String, List<Stop>>, Observable<List<Stop>>>() { @Override public Observable<List<Stop>> call(Pair<String, List<Stop>> queryAndStops) { if(queryAndStops!=null) { if (queryAndStops.second.size() == 0) { return RestClient.service().locationName(queryAndStops.first).concatMap(new Func1<LocationNameResponse, Observable<? extends List<Stop>>>() { @Override public Observable<? extends List<Stop>> call(LocationNameResponse locationNameResponse) { return Observable.just(locationNameResponse.getAddresses()); } }); } else { return Observable.just(queryAndStops.second); } } return Observable.just(null); } }).subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()).compose(this.<List<Stop>>bindToLifecycle()).subscribe(new Action1<List<Stop>>() { @Override public void call(List<Stop> stops) { if (stops != null) { searchStopResultFragment.showStops(stops); }else{ searchStopResultFragment.showStartItems(); } } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { showError(throwable); } }); 

Sin embargo, hay alguna forma más agradable para salir de la cadena sin enviar Observable.just (nulo) y comprobar si hay nulos en la próxima llamada?

  • ¿Cómo usar un rxjava observable después de que se complete?
  • Cómo limpiar RxJava fuego y olvidar suscripciones?
  • Progreso de descarga con RxJava, OkHttp y Okio en Android
  • Rxandroid pide que se ejecute en el hilo ui aunque esté suscrito en AndroidSchedulers.mainThread ()
  • Patrón de repositorio con SqlBrite / SqlDelight (base de datos sin conexión) y Retrofit (solicitud Http)
  • ¿Cómo recuperar cuerpo de respuesta con RxAndroid y Retrofit 2?
  • ¿Es necesario darse de baja?
  • Cómo crear un observable de OnClick evento Android?
  • ¿Qué está causando HTTP FALLA: java.net.SocketException: Socket cerrado?
  • Android Pros y Contras: Event Bus y RxJava
  • Encadenamiento de llamadas de Retrofit con RxJava y devolución del objeto principal
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.