Combine RxTextView Observable y Retrofit Observable

Como ejemplo para empezar con RxAndroid, estoy intentando implementar un searchbox que activa una llamada de reposo cuando los usuarios insertan algo.

Hasta ahora tengo dos partes de trabajo. La primera observando la EditTextView …

RxTextView.textChangeEvents(searchEditText) .debounce(400, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<TextViewTextChangeEvent>() { @Override public void onCompleted() { Timber.d("onCompleted"); } @Override public void onError(Throwable e) { Timber.e(e, "onError"); } @Override public void onNext(TextViewTextChangeEvent e) { Timber.d("onNext" + e.text().toString()); } }); 

… y la segunda parte que llama a la API REST mediante el uso de un Servicio de Retrofit:

 APIManager.getService().searchRestaurants("test") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<Restaurant>>() { @Override public void onCompleted() { Timber.d("onCompleted"); } @Override public void onError(Throwable e) { Timber.e(e, "onError"); } @Override public void onNext(List<Restaurant> restaurants) { Timber.d("onNext"); for (Restaurant restaurant : restaurants) { Timber.d(restaurant.getId() + ": " + restaurant.getName()); } } }); 

Mi problema es combinar las dos partes. Intenté usar el operador del flatMap como sigue:

 RxTextView.textChangeEvents(searchEditText) .debounce(400, TimeUnit.MILLISECONDS) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<TextViewTextChangeEvent, Observable<List<Restaurant>>>() { @Override public Observable<List<Restaurant>> call(TextViewTextChangeEvent txtChangeEvt) { return APIManager.getService().searchRestaurants(txtChangeEvt.text().toString()); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<List<Restaurant>>() { @Override public void onCompleted() { Timber.d("onCompleted"); } @Override public void onError(Throwable e) { Timber.e(e, "onError"); } @Override public void onNext(List<Restaurant> restaurants) { Timber.d("onNext"); for (Restaurant restaurant : restaurants) { Timber.d(restaurant.getId() + ": " + restaurant.getName()); } } }); 

Cuando hago esto obtengo la siguiente excepción:

 java.lang.IllegalStateException: Must be called from the main thread. Was: Thread[RxCachedThreadScheduler-1,5,main] at com.jakewharton.rxbinding.internal.Preconditions.checkUiThread(Preconditions.java:28) at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:21) at com.jakewharton.rxbinding.widget.TextViewTextChangeEventOnSubscribe.call(TextViewTextChangeEventOnSubscribe.java:12) 

Así que traté de solucionarlo llamando a subscribeOn(AndroidSchedulers.mainThread() pero en este caso, por supuesto, obtengo una excepción de NetworkOnMainThread.

Entonces, ¿cómo hago esto? ¿Cuál es una manera apropiada de combinar diferentes Observables que deben ejecutarse en diferentes Hilos?

Sólo .observeOn(AndroidSchedulers.mainThread()) quitar el primer .observeOn(AndroidSchedulers.mainThread()) . Echa un vistazo a este ejemplo

 Observable.just(1) // 1 will be emited in the IO thread pool .subscribeOn(Schedulers.io()) .flatMap(...) // will be in the IO thread pool .observeOn(Schedulers.computation()) .flatMap(...) // will be executed in the computation thread pool .observeOn(AndroidSchedulers.mainThread()) .subscribe(); // will be executed in the Android main thread (if you're running your code on Android) 
  • Cancelar la suscripción de un rx.Single en RxJava
  • RxJava - subir archivos secuencialmente - emitir el siguiente elemento, cuando onNext llamado
  • Prueba de RxBinding RxSearchView
  • Acceso al dominio desde un subproceso incorrecto Excepción mientras se envió una copia usando copyFromRealm
  • Android AsyncTask vs hilo + controlador vs rxjava
  • Cómo encadenar los métodos groupBy () de RxJava como groupBy (). GroupBy ()
  • InterruptedIOException al cambiar de mainThread () a io ()
  • RxJava: Se produjo un error al intentar propagar el error a Observer.onError
  • InterruptedException en el subproceso de caché RxJava al depurar en Android
  • Suscribir 2 diferentes Observable y onNext ambos?
  • RxJava y eventos esporádicos al azar en Android
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.