¿Por qué necesita onBackpressure () aquí para eventos de clic?
Estoy tratando de hacer el manejo de errores para alguna acción vinculada a los clics de botón. Para el enlace uso RxAndroid + RxAndroid. Parece que debe trabajar con el código de abajo, pero no con la línea comentada con onBackpressure()
:
CurrentUser signIn() { throw new RuntimeException(); } Integer x = 1; PublishSubject<Throwable> loginingFailedSubject = PublishSubject.create(); @Override public void onStart() { super.onStart(); RxView.clicks(loginButton) .observeOn(AndroidSchedulers.mainThread()) .doOnNext((v) -> setLoginingWaiting()) .observeOn(Schedulers.newThread()) .map((v) -> signIn()) .lift(new SuppressErrorOperator<>(throwable -> { Log.e("MyTag", "Oops, failed " + x.toString() + " times!"); ++x; loginingFailedSubject.onNext(throwable); })) //.onBackpressureBuffer() .observeOn(AndroidSchedulers.mainThread()) .subscribe(user -> setLoginedUser(user)); loginingFailedSubject .observeOn(AndroidSchedulers.mainThread()) .subscribe(throwable -> setLoginingFailed(throwable)); }
Y esto es el código SuppressErrorOperator
:
- RxJava como autobús de eventos?
- La comprensión incorrecta del búfer en RxJava
- Solo observable con varios suscriptores
- Schedulers.io () no vuelve al hilo principal
- Android: Sondeo de un servidor con Retrofit
public final class SuppressErrorOperator<T> implements Observable.Operator<T, T> { final Action1<Throwable> errorHandler; public SuppressErrorOperator(Action1<Throwable> errorHandler) { this.errorHandler = errorHandler; } public SuppressErrorOperator() {this(null);} @Override public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { return new Subscriber<T>(subscriber) { @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { if (errorHandler != null) { errorHandler.call(e); } } @Override public void onNext(T t) { subscriber.onNext(t); } }; } }
Y esto es lo que conseguí en mi logcat en último después de 100 clics: ¡ Oops, failed 16 times!
Se detiene después de exaclty 16 veces y el 17, se ejecuta a través de setLoginingWaiting()
(lo veo, porque este método deshabilita el botón que también significa, nadie puede hacer clic más de 1 veces por solicitud o cerca de ese número) y .. eso es todo . Parece que no llega a .lift()
en absoluto.
Pero si .onBackpressureBuffer()
, funciona perfectamente ahora! Leo mucho sobre la contrapresión. Incluso paso todo el día para entender el código fuente de Observable
, Subscriber
etc
Sé, lo que 16 – es un tamaño constante para el búfer de Android. ¿Pero por qué es golpeado? No hago clic en el botón que a menudo. Además, no hay onNext()
en absoluto, por lo que el buffer no puede exceder en cualquier caso! Todos onError()
tragado por Operator
.
También sé que observeOn()
funciona a través de un protocolo pull para que de forma interna quiera usar request()
. Y si comento el último observeOn()
antes de .subscribe(user -> setLoginedUser(user));
– funcionará también (pero por supuesto, es inaceptable).
Pero ¿por qué y qué es lo que necesita sobre la onBackpressure()
aquí para estar vivo? Además, ¿por qué muere sin ninguna excepción como MissingBackpressureException
o algo así?
- Utilizando RxJava para la validación de inicio de sesión de correo electrónico, un observable está emitiendo dos veces
- RxJava y MVP en la aplicación para Android
- Realm + Retrofit + Rxjava
- ¿Tenemos alguna posibilidad de detener la solicitud en OkHttp Interceptor?
- Obtenga el código de estado de la respuesta utilizando Retrofit 2.0 y RxJava
- Utilizar la programación con RxAndroid
- RxJava y datos almacenados en caché
- ¿Cómo utilizar Flowable en RxJava 2?
El problema es que interfiere con el ciclo de vida del flujo. map
bloquea pero suprime la excepción y no hay ningún valor enviado a la parte inferior. Si downstream no obtiene ningún valor, no puede saber que debe solicitar más, por lo que toda la secuencia se detiene, dejando que los búferes se llenen. onBackpressureBuffer
sólo funciona porque solicita Long.MAX_VALUE y mantiene la fuente en ejecución.
Tenga en cuenta, sin embargo, que el map
no debería funcionar de la manera que lo hace ahora, sino anular la suscripción en el primer signo de error de la función.
La alternativa correcta sería:
RxView.clicks(loginButton) .observeOn(AndroidSchedulers.mainThread()) .doOnNext((v) -> setLoginingWaiting()) .observeOn(Schedulers.newThread()) .flatMap(v -> { try { return Observable.just(signIn()); } catch (Throwable ex) { Log.e("MyTag", "Oops, failed " + x.toString() + " times!"); ++x; loginingFailedSubject.onNext(ex); return Observable.empty(); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(user -> setLoginedUser(user));
OperatorObserveOn tiene una cola con un tamaño de RxRingBuffer.SIZE (16 en android), si excede el tamaño de esta cola se lanzará una excepción MissingBackpressure.
En general, para evitar problemas de contrapresión, puede limitar los hilos de conmutación, limitar la emisión de eventos (acelerador, búfer, etc.) o utilizar los operadores de BypasspressXXX.
Aunque parece que en su caso – botón de inicio de sesión – sólo necesita manejar una solicitud a la vez, así que ¿por qué no ocultar el botón con ProgressBar o establecer enable (false) para el momento de una solicitud
Supongo que hay un error en OperatorObserveOn::ObserveOnSubscriber
.
En el método init establece el nuevo productor y el incremento requested
(de ObserveOnSubscriber
) por el suscriptor interno (hijo) requested
. Pero cuando ObserveOnSubscriber
es el niño tiene un valor por defecto (en el caso de Android – 16) de Subscriber::requested
ObserveOnSubscriber::requested
y el valor real de requested
está en ObserveOnSubscriber::requested
.
El problema es que Subscriber::requested
se utiliza y no se actualiza. Así que la solución es reescribir el método init de ObserveOnSubscriber
(acabo de copiar las fuentes de OperatorObserveOn y crear la copia del mismo en el paquete con el mismo nombre que en RxJava).
void init() { // don't want this code in the constructor because `this` can escape through the // setProducer call Subscriber<? super T> localChild = child; localChild.setProducer(new Producer() { @Override public void request(long n) { if (n > 0L) { BackpressureUtils.getAndAddRequest(requested, n); ObserveOnSubscriber.this.request(requested.get()); schedule(); } } }); localChild.add(recursiveScheduler); localChild.add(this); }
También he creado problema en github.
- NoSuchMethodError: java.lang.Long.hashCode
- cómo cambiar la dirección del título de CollapsingToolbarLayout?