RxJava: Averigüe si BehaviorSubject fue un valor repetido o no
Estoy haciendo una interfaz de Android que muestra algunos datos extraídos de la red. Quiero que muestre los últimos datos disponibles y que nunca esté vacío (a menos que todavía no se hayan obtenido datos), por lo que estoy usando un BehaviorSubject para dar a los suscriptores (mi interfaz de usuario) la información más reciente disponible, mientras se actualiza en El fondo para actualizarlo.
Esto funciona, pero debido a otro requisito en mi interfaz de usuario, ahora tengo que saber si o no el resultado publicado fue obtenido fresco de la red o no. (En otras palabras, necesito saber si el resultado publicado fue el elemento guardado de BehaviorSubject o no.)
- Android: Sondeo de un servidor con Retrofit
- Cómo anular los Observables compartidos, infinitos con un retraso después de que el último suscriptor no se haya suscrito
- La llamada onNext de PublishSubject en un hilo diferente después de actualizar a Retrofit 2.0
- ¿Cómo estructurar mi aplicación usando MVP con rxjava y retrofit para obtener datos de Observables?
- RxJava se vuelve a suscribir al evento tras la restauración de la actividad
¿Cómo puedo conseguir esto? Si necesito dividirlo en múltiples Observables, está bien, siempre y cuando sea capaz de obtener el comportamiento de caché de BehaviorSubject (obteniendo el último resultado disponible), al mismo tiempo ser capaz de decir si el resultado devuelto fue del caché o no. Un hacky manera que puedo pensar en hacerlo sería comprobar si la marca de tiempo de la respuesta fue relativamente pronto, pero que sería muy descuidado y prefiero encontrar una manera de hacerlo con RxJava.
- Acceso al dominio desde un subproceso incorrecto Excepción mientras se envió una copia usando copyFromRealm
- Solución global de errorHandling con RxJava sólo cuando onError no está implementado
- Encadenar dos observables de adaptación con RxJava
- Android gestiona la solicitud múltiple rxJava en el dispositivo de rotación
- Combinación de llamadas API con RX Java
- Hilo predeterminado de actualización
- RxJava - cómo invocar abonado en eventos de 4 clics en Android
- RxJava - Encadenamiento de solicitudes y actualización de la interfaz de usuario
Como mencionaste en la pregunta, esto se puede lograr con múltiples Observables. En esencia, usted tiene dos observables: " la respuesta fresca se puede observar ", y " la respuesta en caché se puede observar ". Si algo puede ser "observado", puede expresarlo como un Observable. Vamos a nombrar el primero original
y el segundo replayed
.
Ver este JSBin (JavaScript, pero los conceptos pueden ser traducidos directamente a Java. No hay un JavaBin por lo que sé, para estos propósitos).
var original = Rx.Observable.interval(1000) .map(function (x) { return {value: x, from: 'original'}; }) .take(4) .publish().refCount(); var replayed = original .map(function (x) { return {value: x.value, from: 'replayed'}; }) .replay(null, 1).refCount(); var merged = Rx.Observable.merge(original, replayed) .replay(null, 1).refCount() .distinctUntilChanged(function (obj) { return obj.value; }); console.log('subscribe 1st'); merged.subscribe(function (x) { console.log('subscriber1: value ' + x.value + ', from: ' + x.from); }); setTimeout(function () { console.log(' subscribe 2nd'); merged.subscribe(function (x) { console.log(' subscriber2: value ' + x.value + ', from: ' + x.from); }); }, 2500);
La idea general aquí es: anotar el evento con un campo from
indicar su origen. Si es original
, es una respuesta fresca. Si se replayed
, es una respuesta en caché. El original
observable emitirá solamente from: 'original'
y el Observable replayed
emitirá solamente from: 'replayed'
. En Java necesitaríamos un poco más de caligrafía porque necesitas hacer una clase para representar estos eventos anotados. De lo contrario los mismos operadores en RxJS se pueden encontrar en RxJava.
El Observable original es publish().refCount()
porque sólo queremos que una instancia de este flujo sea compartida con todos los observadores. De hecho, en RxJS y Rx.NET, share()
es un alias para publish().refCount()
.
El Observable replay(1).refCount()
es replay(1).refCount()
porque también se comparte igual que el original, pero replay(1)
nos da el comportamiento de almacenamiento en caché.
merged
Observable contiene tanto original como repetida, y esto es lo que debe exponer a todos los suscriptores. Dado que se replayed
inmediato emitir cuando original
hace original
, usamos distinctUntilChanged
en el valor del evento para ignorar consecutivos inmediatos. La razón por la que replay(1).refCount()
también la fusión es porque queremos que la fusión de original y repetición también sea una sola instancia compartida de una secuencia compartida entre todos los observadores. Habríamos usado publish().refCount()
para este propósito, pero no podemos perder el efecto de repetición que se replayed
contiene, por lo que es replay(1).refCount()
, no publish().refCount()
.
¿No Distinct cubrir su caso? BehaviorSubject sólo repite el elemento más reciente después de la suscripción.
Creo que lo que quieres es algo como esto:
private final BehaviorSubject<T> fetched = BehaviorSubject.create(); private final Observable<FirstTime<T>> _fetched = fetched.lift(new Observable.Operator<FirstTime<T>, T>() { private AtomicReference<T> last = new AtomicReference<>(); @Override public Subscriber<? super T> call(Subscriber<? super FirstTime<T>> child) { return new Subscriber<T>(child) { @Override public void onCompleted() { child.onCompleted(); } @Override public void onError(Throwable e) { child.onError(e); } @Override public void onNext(T t) { if (!Objects.equals(t, last.getAndSet(t))) { child.onNext(FirstTime.yes(t)); } else { child.onNext(FirstTime.no(t)); } } }; } }); public Observable<FirstTime<T>> getObservable() { return _fetched; } public static class FirstTime<T> { final boolean isItTheFirstTime; final T value; public FirstTime(boolean isItTheFirstTime, T value) { this.isItTheFirstTime = isItTheFirstTime; this.value = value; } public boolean isItTheFirstTime() { return isItTheFirstTime; } public T getValue() { return value; } public static <T> FirstTime<T> yes(T value) { return new FirstTime<>(true, value); } public static <T> FirstTime<T> no(T value) { return new FirstTime<>(false, value); } }
La clase wrapper FirstTime
tiene un booleano que se puede utilizar para ver si algún suscriptor al Observable lo ha visto antes.
Espero que ayude.
Almacene la información de BehaviorSubject objetos en una estructura de datos con una buena búsqueda, como un diccionario. Cada valor sería una clave y el valor sería el número de iteración.
Allí, cuando miras una clave particular, si tu diccionario ya contiene y su valor ya está en uno, entonces sabes que un valor es un valor repetido.
No estoy muy seguro de lo que quieres lograr. Probablemente le gustaría tener una fuente inteligente para los "últimos" datos y una segunda fuente que le dice cuando los datos se actualizaron?
BehaviorSubject<Integer> dataSubject = BehaviorSubject.create(42); // initial value, "never empty" Observable<String> refreshedIndicator = dataSubject.map(data -> "Refreshed!"); refreshedIndicator.subscribe(System.out::println); Observable<Integer> latestActualData = dataSubject.distinctUntilChanged(); latestActualData.subscribe( data -> System.out.println( "Got new data: " + data)); // simulation of background activity: Observable.interval(1, TimeUnit.SECONDS) .limit(100) .toBlocking() .subscribe(aLong -> dataSubject.onNext(ThreadLocalRandom.current().nextInt(2)));
Salida:
Refreshed! Got new data: 42 Refreshed! Got new data: 0 Refreshed! Refreshed! Refreshed! Got new data: 1 Refreshed! Got new data: 0 Refreshed! Got new data: 1
- Arrastrar y soltar, ListView y las vistas de elementos que pierden el evento ACTION_DRAG_STARTED
- String.xml me proporciona una cadena incorrecta para ProcessDialog