Sustitución de EventBus por RxJava – N suscriptores siempre escuchando

Estoy reemplazando un patrón RxJava con RxJava en una aplicación para Android. Tuve eventos para alertar a las partes interesadas de las actualizaciones de los datos en mi singleton caché. Cada vez que se llamaba a un servicio web, los datos se actualizarían y los suscriptores serían alertados a través de un evento publicado.

Tengo algo cercano a este conjunto en RxJava con AsyncSubject . Los observadores obtienen un solo evento del sujeto, pero luego reciben un evento onComplete y se dan de baja. Esto funciona a medida que la interfaz de usuario se carga primero, pero cuando hay que actualizar los datos, no hay suscriptores que se notifiquen. ¿Cómo puedo decirle a esos Subscribers que sigan escuchando más onNext sucesos siguientes del Subject ?

Necesito un Subject que informe el elemento más reciente. PublishSubject sólo emite elementos después de la suscripción, por lo que no cumple mis necesidades. Mis suscriptores empiezan a observar en diferentes momentos (posiblemente después del primer evento de datos), así que necesito que el Sujeto emita el último elemento observado y luego mantenga el flujo abierto para los elementos siguientes. Parece que una combinación de AsyncSubject y PublishSubject es lo que necesito. ¿Hay alguna manera de lograr esto con las clases incorporadas, o necesito crear mi propio tema?

 WebServiceObservable OR CacheObservable ^ | AsyncSubject ^ | / \ / \ / \ UiObserver1 UiObserver2 

Un poco de respuesta tardía, pero una opción un poco mejor para su scenarion que el BehaviorSubject podría ser BehaviorRelay de RxRelay lib . Y también para soluciones más globales cuando se necesitan diferentes comportamientos, pero quieren compartir punto único de interacción entre todos los módulos que puede utilizar RxHub

BehaviorSubject se ajustará a sus necesidades. https://github.com/Netflix/RxJava/wiki/Subject#behaviorsubject

Si necesita un comportamiento más sofisticado, siempre puede escribir su propia implementación de Subject . Parece bastante sencillo hacerlo.

Creo que es más simple si se utiliza BehaviorSubject con el operador switchOnNext.

SwitchOnNext () convierte un Observable que emite Observables (BehaviorSubject en este ejemplo) en un solo Observable que emite los ítems emitidos por los más recientemente emitidos de los Observables

El Observable devuelto por switchOnNext () se cancela la suscripción de la previamente emitida Observable empieza a emitir elementos de la última Observable

 public class PhotoModel{ BehaviorSubject<Observable<Photo>> subject = BehaviorSubject.create(...); public void setUserId(String id){ subject.onNext(Api.getUserPhoto(photoId)); } public Observable<Photo> subscribeToPhoto(){ return Observable.switchOnNext(subject); } } 

¿Cuándo se debe usar RxJava Observable y cuándo Callback simple en Android?

  • RxJava onCompleted y onTerminate en el hilo principal
  • InterruptedIOException al cambiar de mainThread () a io ()
  • Realm, RxJava, asObservable () y doOnUnsubscribe ()
  • IdlingResource Espresso con RxJava
  • Robolectric + OkHttp + retrofit + prueba de unidad rxJava
  • Hilo predeterminado de actualización
  • Android RxJava 2 Prueba JUnit - getMainLooper en android.os.Looper no se burla de RuntimeException
  • Obtenga el valor actual de RxJava Observable
  • Uso de RoboSpice y RxJava
  • ¿Qué es RxJava equivalente de orElse
  • Cómo manejar diferentes tipos de errores en Retrofit Rx onError sin instancia fea
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.