RxJava: Averigüe si BehaviorSubject fue un valor repetido o no

Estoy haciendo una interfaz de Android que muestra algunos datos extraídos de la red. Quiero que muestre los últimos datos disponibles y que nunca esté vacío (a menos que todavía no se hayan obtenido datos), por lo que estoy usando un BehaviorSubject para dar a los suscriptores (mi interfaz de usuario) la información más reciente disponible, mientras se actualiza en El fondo para actualizarlo.

Esto funciona, pero debido a otro requisito en mi interfaz de usuario, ahora tengo que saber si o no el resultado publicado fue obtenido fresco de la red o no. (En otras palabras, necesito saber si el resultado publicado fue el elemento guardado de BehaviorSubject o no.)

¿Cómo puedo conseguir esto? Si necesito dividirlo en múltiples Observables, está bien, siempre y cuando sea capaz de obtener el comportamiento de caché de BehaviorSubject (obteniendo el último resultado disponible), al mismo tiempo ser capaz de decir si el resultado devuelto fue del caché o no. Un hacky manera que puedo pensar en hacerlo sería comprobar si la marca de tiempo de la respuesta fue relativamente pronto, pero que sería muy descuidado y prefiero encontrar una manera de hacerlo con RxJava.

Como mencionaste en la pregunta, esto se puede lograr con múltiples Observables. En esencia, usted tiene dos observables: " la respuesta fresca se puede observar ", y " la respuesta en caché se puede observar ". Si algo puede ser "observado", puede expresarlo como un Observable. Vamos a nombrar el primero original y el segundo replayed .

Ver este JSBin (JavaScript, pero los conceptos pueden ser traducidos directamente a Java. No hay un JavaBin por lo que sé, para estos propósitos).

 var original = Rx.Observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refCount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refCount(); var merged = Rx.Observable.merge(original, replayed) .replay(null, 1).refCount() .distinctUntilChanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); setTimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500); 

La idea general aquí es: anotar el evento con un campo from indicar su origen. Si es original , es una respuesta fresca. Si se replayed , es una respuesta en caché. El original observable emitirá solamente from: 'original' y el Observable replayed emitirá solamente from: 'replayed' . En Java necesitaríamos un poco más de caligrafía porque necesitas hacer una clase para representar estos eventos anotados. De lo contrario los mismos operadores en RxJS se pueden encontrar en RxJava.

El Observable original es publish().refCount() porque sólo queremos que una instancia de este flujo sea compartida con todos los observadores. De hecho, en RxJS y Rx.NET, share() es un alias para publish().refCount() .

El Observable replay(1).refCount() es replay(1).refCount() porque también se comparte igual que el original, pero replay(1) nos da el comportamiento de almacenamiento en caché.

merged Observable contiene tanto original como repetida, y esto es lo que debe exponer a todos los suscriptores. Dado que se replayed inmediato emitir cuando original hace original , usamos distinctUntilChanged en el valor del evento para ignorar consecutivos inmediatos. La razón por la que replay(1).refCount() también la fusión es porque queremos que la fusión de original y repetición también sea una sola instancia compartida de una secuencia compartida entre todos los observadores. Habríamos usado publish().refCount() para este propósito, pero no podemos perder el efecto de repetición que se replayed contiene, por lo que es replay(1).refCount() , no publish().refCount() .

¿No Distinct cubrir su caso? BehaviorSubject sólo repite el elemento más reciente después de la suscripción.

Creo que lo que quieres es algo como esto:

 private final BehaviorSubject<T> fetched = BehaviorSubject.create(); private final Observable<FirstTime<T>> _fetched = fetched.lift(new Observable.Operator<FirstTime<T>, T>() { private AtomicReference<T> last = new AtomicReference<>(); @Override public Subscriber<? super T> call(Subscriber<? super FirstTime<T>> child) { return new Subscriber<T>(child) { @Override public void onCompleted() { child.onCompleted(); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onNext(T t) { if (!Objects.equals(t, last.getAndSet(t))) { child.onNext(FirstTime.yes(t)); } else { child.onNext(FirstTime.no(t)); } } }; } }); public Observable<FirstTime<T>> getObservable() { return _fetched; } public static class FirstTime<T> { final boolean isItTheFirstTime; final T value; public FirstTime(boolean isItTheFirstTime, T value) { this.isItTheFirstTime = isItTheFirstTime; this.value = value; } public boolean isItTheFirstTime() { return isItTheFirstTime; } public T getValue() { return value; } public static <T> FirstTime<T> yes(T value) { return new FirstTime<>(true, value); } public static <T> FirstTime<T> no(T value) { return new FirstTime<>(false, value); } } 

La clase wrapper FirstTime tiene un booleano que se puede utilizar para ver si algún suscriptor al Observable lo ha visto antes.

Espero que ayude.

Almacene la información de BehaviorSubject objetos en una estructura de datos con una buena búsqueda, como un diccionario. Cada valor sería una clave y el valor sería el número de iteración.

Allí, cuando miras una clave particular, si tu diccionario ya contiene y su valor ya está en uno, entonces sabes que un valor es un valor repetido.

No estoy muy seguro de lo que quieres lograr. Probablemente le gustaría tener una fuente inteligente para los "últimos" datos y una segunda fuente que le dice cuando los datos se actualizaron?

  BehaviorSubject<Integer> dataSubject = BehaviorSubject.create(42); // initial value, "never empty" Observable<String> refreshedIndicator = dataSubject.map(data -> "Refreshed!"); refreshedIndicator.subscribe(System.out::println); Observable<Integer> latestActualData = dataSubject.distinctUntilChanged(); latestActualData.subscribe( data -> System.out.println( "Got new data: " + data)); // simulation of background activity: Observable.interval(1, TimeUnit.SECONDS) .limit(100) .toBlocking() .subscribe(aLong -> dataSubject.onNext(ThreadLocalRandom.current().nextInt(2))); 

Salida:

 Refreshed! Got new data: 42 Refreshed! Got new data: 0 Refreshed! Refreshed! Refreshed! Got new data: 1 Refreshed! Got new data: 0 Refreshed! Got new data: 1 
  • Observable.just () que devuelve Unidad en Kotlin
  • Compartiendo RxBindings Eventos observables entre múltiples suscriptores
  • ¿Cómo terminar un observable?
  • Dar un RxJava Observable algo a emitir de otro método
  • Manejo de listas con RxJava y Retrofit en android
  • Retrofit con rxjava manejando excepciones de red a nivel mundial
  • ¿Qué hilo es unSubscribeOn llamado? ¿Deberíamos llamarlo?
  • Android.os.NetworkOnMainThreadException usando rxjava en android
  • Android retofit + rxjava cómo procesar el parámetro de petición dinámica con repeatwhen
  • Prueba de unidad de Android con Retrofit / RxJava / Roboletric y Mockito
  • Manera eficiente de manipular los subprocesos RxJava
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.