¿Por qué necesita onBackpressure () aquí para eventos de clic?

Estoy tratando de hacer el manejo de errores para alguna acción vinculada a los clics de botón. Para el enlace uso RxAndroid + RxAndroid. Parece que debe trabajar con el código de abajo, pero no con la línea comentada con onBackpressure() :

 CurrentUser signIn() { throw new RuntimeException(); } Integer x = 1; PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create(); @Override public void onStart() { super.onStart(); RxView.clicks(loginButton) .observeOn(AndroidSchedulers.mainThread()) .doOnNext((v) -> setLoginingWaiting()) .observeOn(Schedulers.newThread()) .map((v) -> signIn()) .lift(new SuppressErrorOperator<>(throwable -> { Log.e("MyTag", "Oops, failed " + x.toString() + " times!"); ++x; loginingFailedSubject.onNext(throwable); })) //.onBackpressureBuffer() .observeOn(AndroidSchedulers.mainThread()) .subscribe(user -> setLoginedUser(user)); loginingFailedSubject .observeOn(AndroidSchedulers.mainThread()) .subscribe(throwable -> setLoginingFailed(throwable)); } 

Y esto es el código SuppressErrorOperator :

 public final class SuppressErrorOperator<T> implements Observable.Operator<T, T> { final Action1<Throwable> errorHandler; public SuppressErrorOperator(Action1<Throwable> errorHandler) { this.errorHandler = errorHandler; } public SuppressErrorOperator() {this(null);} @Override public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { return new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { if (errorHandler != null) { errorHandler.call(e); } } @Override public void onNext(T t) { subscriber.onNext(t); } }; } } 

Y esto es lo que conseguí en mi logcat en último después de 100 clics: ¡ Oops, failed 16 times!

Se detiene después de exaclty 16 veces y el 17, se ejecuta a través de setLoginingWaiting() (lo veo, porque este método deshabilita el botón que también significa, nadie puede hacer clic más de 1 veces por solicitud o cerca de ese número) y .. eso es todo . Parece que no llega a .lift() en absoluto.

Pero si .onBackpressureBuffer() , funciona perfectamente ahora! Leo mucho sobre la contrapresión. Incluso paso todo el día para entender el código fuente de Observable , Subscriber etc

Sé, lo que 16 – es un tamaño constante para el búfer de Android. ¿Pero por qué es golpeado? No hago clic en el botón que a menudo. Además, no hay onNext() en absoluto, por lo que el buffer no puede exceder en cualquier caso! Todos onError() tragado por Operator .

También sé que observeOn() funciona a través de un protocolo pull para que de forma interna quiera usar request() . Y si comento el último observeOn() antes de .subscribe(user -> setLoginedUser(user)); – funcionará también (pero por supuesto, es inaceptable).

Pero ¿por qué y qué es lo que necesita sobre la onBackpressure() aquí para estar vivo? Además, ¿por qué muere sin ninguna excepción como MissingBackpressureException o algo así?

El problema es que interfiere con el ciclo de vida del flujo. map bloquea pero suprime la excepción y no hay ningún valor enviado a la parte inferior. Si downstream no obtiene ningún valor, no puede saber que debe solicitar más, por lo que toda la secuencia se detiene, dejando que los búferes se llenen. onBackpressureBuffer sólo funciona porque solicita Long.MAX_VALUE y mantiene la fuente en ejecución.

Tenga en cuenta, sin embargo, que el map no debería funcionar de la manera que lo hace ahora, sino anular la suscripción en el primer signo de error de la función.

La alternativa correcta sería:

 RxView.clicks(loginButton) .observeOn(AndroidSchedulers.mainThread()) .doOnNext((v) -> setLoginingWaiting()) .observeOn(Schedulers.newThread()) .flatMap(v -> { try { return Observable.just(signIn()); } catch (Throwable ex) { Log.e("MyTag", "Oops, failed " + x.toString() + " times!"); ++x; loginingFailedSubject.onNext(ex); return Observable.empty(); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(user -> setLoginedUser(user)); 

OperatorObserveOn tiene una cola con un tamaño de RxRingBuffer.SIZE (16 en android), si excede el tamaño de esta cola se lanzará una excepción MissingBackpressure.

En general, para evitar problemas de contrapresión, puede limitar los hilos de conmutación, limitar la emisión de eventos (acelerador, búfer, etc.) o utilizar los operadores de BypasspressXXX.

Aunque parece que en su caso – botón de inicio de sesión – sólo necesita manejar una solicitud a la vez, así que ¿por qué no ocultar el botón con ProgressBar o establecer enable (false) para el momento de una solicitud

Supongo que hay un error en OperatorObserveOn::ObserveOnSubscriber .

En el método init establece el nuevo productor y el incremento requested (de ObserveOnSubscriber ) por el suscriptor interno (hijo) requested . Pero cuando ObserveOnSubscriber es el niño tiene un valor por defecto (en el caso de Android – 16) de Subscriber::requested ObserveOnSubscriber::requested y el valor real de requested está en ObserveOnSubscriber::requested .
El problema es que Subscriber::requested se utiliza y no se actualiza. Así que la solución es reescribir el método init de ObserveOnSubscriber (acabo de copiar las fuentes de OperatorObserveOn y crear la copia del mismo en el paquete con el mismo nombre que en RxJava).

  void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<? super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); ObserveOnSubscriber.this.request(requested.get()); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); } 

También he creado problema en github.

  • RxJava y eventos esporádicos al azar en Android
  • convertir a lambda de rx java expresión
  • RXJava - hacer un observable pausable (con buffer y ventana, por ejemplo)
  • Okhttp ignora la configuración de Dispatcher cuando se usa con Retrofit RxJavaCallAdapterFactory
  • Prueba de unidad JVM con Mockito para pruebas Retrofit2 y RxJava para solicitudes de red
  • ¿Es posible obtener 2 valores en onNext () de suscriptor en rxjava android?
  • Prueba Espresso con RxLoader
  • Default Schedulers para rxjava en Android
  • Cómo manejar clics de artículo para una vista de reciclar usando RxJava
  • Usando observeOn () el hilo de interfaz de usuario de Android bloquea mi prueba de emulador
  • No vuelva a ejecutar la llamada de Retrofit si todavía está en curso con RxJava 2
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.