RXJava – hacer un observable pausable (con buffer y ventana, por ejemplo)

Quiero crear observables que hagan lo siguiente:

  • Almacenar todos los elementos, mientras están en pausa
  • Inmediatamente emiten elementos, mientras que no se detienen
  • El gatillo de pausa / reanudación debe provenir de otro observable
  • Debe ser guardar para ser utilizado por observables que no se ejecutan en el hilo principal y debe ser guardar cambiar el estado de pausa / reanudación del hilo principal

Quiero usar un BehaviorSubject<Boolean> como desencadenador y enlazar este desencadenador al evento onResume y onPause una actividad. (Ejemplo de código adjunto)

Pregunta

He configurado algo, pero no está funcionando como estaba previsto. Lo uso como sigue:

 Observable o = ...; // Variant 1 o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) // Variant 2 // o = o.compose(RXPauser.applyPauser(getPauser())); o .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(); 

Actualmente el problema es que la Variante 1 debería funcionar bien, pero a veces, los eventos no son emitidos – la válvula no emite, hasta que la válvula todo funcione (puede ser un problema de roscado …)! La solución 2 es mucho más simple y parece funcionar, pero no estoy seguro si es realmente mejor, no lo creo. En realidad no estoy seguro, ¿por qué una solución es fallar a veces, así que no estoy seguro si la solución 2 resuelve el problema (actualmente para mí desconocido) …

¿Puede alguien decirme cuál podría ser el problema o si la solución simple debe trabajar confiablemente? ¿O puede mostrarme una solución confiable?

Código

RxValue

Https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

Funciones de RXPauser

 public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) { return observable -> pauser(observable, pauser); } private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) { // this observable buffers all items that are emitted while emission is paused Observable<T> sharedSource = source.publish().refCount(); Observable<T> queue = sharedSource .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) .flatMap(l -> Observable.from(l)) .doOnNext(t -> Ld(RXPauser.class, "Pauser QUEUED: " + t)); // this observable emits all items that are emitted while emission is not paused Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) .switchMap(tObservable -> tObservable) .doOnNext(t -> Ld(RXPauser.class, "Pauser NOT QUEUED: " + t)); // combine both observables return queue.mergeWith(window) .doOnNext(t -> Ld(RXPauser.class, "Pauser DELIVERED: " + t)); } 

Actividad

 public class BaseActivity extends AppCompatActivity { private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); public BaseActivity(Bundle savedInstanceState) { super(args); final Class<?> clazz = this.getClass(); pauser .doOnUnsubscribe(() -> { Ld(clazz, "Pauser unsubscribed!"); }) .subscribe(aBoolean -> { Ld(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); }); } public PublishSubject<Boolean> getPauser() { return pauser; } @Override protected void onResume() { super.onResume(); pauser.onNext(true); } @Override protected void onPause() { pauser.onNext(false); super.onPause(); } } 

Puede utilizar el operador .buffer() pasándolo observable, definiendo cuándo parar el almacenamiento en búfer, muestra del libro:

 Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) .subscribe(System.out::println); 

Del capítulo 5, "Domar la secuencia": https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted% 20sequencias.md

Puede utilizar PublishSubject como Observable para alimentar elementos en su operador personalizado. Cada vez que necesite iniciar el almacenamiento en búfer, cree la instancia por Observable.defer(() -> createBufferingValve())

Hice algo parecido para registrar eventos.
El sujeto recoge algunos eventos, y una vez en 10 segundos los empuja al servidor.

La idea principal es, por ejemplo, tienes clase Event .

 public class Event { public String jsonData; public String getJsonData() { return jsonData; } public Event setJsonData(String jsonData) { this.jsonData = jsonData; return this; } } 

Debe crear cola para eventos:

 private PublishSubject<Event> eventQueue = PublishSubject.create(); 

Puede ser BehaviorSubject , no importa

A continuación, debe crear la lógica, que se encargará de empujar eventos para el servidor:

  eventObservable = eventQueue.asObservable() .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds .toList() .doOnNext(new Action1<List<Event>>() { @Override public void call(List<Event> events) { apiClient.pushEvents(events); //push your event } }) .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { @Override public Observable<List<Event>> call(Throwable throwable) { return null; //make sure, that on error will be never called } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()); 

A continuación, debe suscribirse a él, y mantener la suscripción, hasta que no lo necesita:

 eventSubscription = eventObservable.subscribe() 

Inicio esto ayuda

  • Por qué definir Flowable podría recibir actualizaciones de la base de datos
  • Escritura de pruebas síncronas para Rx v2 Flowable
  • Combine RxTextView Observable y Retrofit Observable
  • Cómo utilizar CompositeDisposable de RxJava 2?
  • RxJava: "java.lang.IllegalStateException: Sólo un suscriptor permitido!"
  • Dar un RxJava Observable algo a emitir de otro método
  • ¿Cómo utilizar Rx para gestionar múltiples observadores con el fin de mantener una sola conexión abierta a un servicio?
  • ¿Por qué debounce () con toList () no funciona en RxAndroid?
  • MissingBackpressureException incluso después de llamar a onBackpressureBlock ()
  • Prueba de unidad de Android con Retrofit / RxJava / Roboletric y Mockito
  • Hacer cola de tareas con RxJava en Android
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.