RxJava: "java.lang.IllegalStateException: Sólo un suscriptor permitido!"

Estoy usando RxJava para calcular la correlación automática normalizada sobre algunos datos de sensores en Android. Curiosamente, mi código lanza una excepción ("java.lang.IllegalStateException: Sólo se permite un suscriptor!") Y no estoy seguro de qué hacer con él: sé que GroupedObservables podría lanzar esta excepción al suscribirse a mis múltiples suscriptores, pero No creo que esté usando tal cosa en ninguna parte.

A continuación encontrará el método que (lo más probable) activa la excepción:

public Observable<Float> normalizedAutoCorrelation(Observable<Float> observable, final int lag) { Observable<Float> laggedObservable = observable.skip(lag); Observable<Float> meanObservable = mean(observable, lag); Observable<Float> laggedMeanObservable = mean(laggedObservable, lag); Observable<Float> standardDeviationObservable = standardDeviation(observable, meanObservable, lag); Observable<Float> laggedStandardDeviationObservable = standardDeviation(laggedObservable, laggedMeanObservable, lag); Observable<Float> deviation = observable.zipWith(meanObservable, new Func2<Float, Float, Float>() { @Override public Float call(Float value, Float mean) { return value - mean; } }); Observable<Float> laggedDeviation = observable.zipWith(laggedMeanObservable, new Func2<Float, Float, Float>() { @Override public Float call(Float value, Float mean) { return value - mean; } }); Observable<Float> autoCorrelationPartObservable = deviation.zipWith(laggedDeviation, new Func2<Float, Float, Float>() { @Override public Float call(Float value, Float laggedValue) { return value * laggedValue; } }); Observable<Float> autoCorrelationObservable = flatten(autoCorrelationPartObservable.window(lag, 1).scan(new Func2<Observable<Float>, Observable<Float>, Observable<Float>>() { @Override public Observable<Float> call(Observable<Float> memoObservable, Observable<Float> observable) { if(memoObservable == null) return observable; return memoObservable.zipWith(observable, new Func2<Float, Float, Float>() { @Override public Float call(Float memo, Float value) { return memo + value; } }); } })); Observable<Float> normalizationObservable = standardDeviationObservable.zipWith(laggedStandardDeviationObservable, new Func2<Float, Float, Float>() { @Override public Float call(Float standardDeviation, Float laggedStandardDeviation) { return lag * standardDeviation * laggedStandardDeviation; } }); return autoCorrelationObservable.zipWith(normalizationObservable, new Func2<Float, Float, Float>() { @Override public Float call(Float autoCorrelation, Float normalization) { return autoCorrelation / normalization; } }); } 

Y este es el stacktrace que obtengo:

 java.lang.IllegalStateException: Only one subscriber allowed! at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:124) at rx.internal.operators.BufferUntilSubscriber$OnSubscribeAction.call(BufferUntilSubscriber.java:81) at rx.Observable.unsafeSubscribe(Observable.java:7304) at rx.internal.operators.OperatorZip$Zip.start(OperatorZip.java:210) at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:154) at rx.internal.operators.OperatorZip$ZipSubscriber.onNext(OperatorZip.java:120) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:41) at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:30) at rx.Observable$1.call(Observable.java:145) at rx.Observable$1.call(Observable.java:137) at rx.Observable.unsafeSubscribe(Observable.java:7304) at rx.internal.operators.OperatorMerge$MergeSubscriber.handleNewSource(OperatorMerge.java:188) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:158) at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:93) at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) at rx.internal.operators.OperatorScan$2.onNext(OperatorScan.java:110) at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:173) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326) at rx.internal.operators.OperatorZip$Zip.tick(OperatorZip.java:255) at rx.internal.operators.OperatorZip$Zip$InnerSubscriber.onNext(OperatorZip.java:326) at rx.internal.operators.OperatorMerge$InnerSubscriber.emit(OperatorMerge.java:635) at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:545) at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150) at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:98) at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) at rx.internal.operators.BufferUntilSubscriber.onCompleted(BufferUntilSubscriber.java:161) at rx.internal.operators.OperatorWindowWithSize$InexactSubscriber.onNext(OperatorWindowWithSize.java:183) at rx.internal.operators.OperatorSkip$1.onNext(OperatorSkip.java:58) at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55) at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224) at rx.subjects.PublishSubject.onNext(PublishSubject.java:121) at com.github.joopaue.smartphonesensing.SensorService$3.onSensorChanged(SensorService.java:102) at android.hardware.SystemSensorManager$SensorEventQueue.dispatchSensorEvent(SystemSensorManager.java:418) at android.os.MessageQueue.nativePollOnce(Native Method) at android.os.MessageQueue.next(MessageQueue.java:138) at android.os.Looper.loop(Looper.java:123) at android.app.ActivityThread.main(ActivityThread.java:5146) at java.lang.reflect.Method.invokeNative(Native Method) at java.lang.reflect.Method.invoke(Method.java:515) at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:732) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:566) at dalvik.system.NativeStart.main(Native Method) 

No creo que esté haciendo nada extraño aquí: algunas cremalleras, reduce, escanea y planos planos.

¿Estoy perdiendo algo completamente obvio, hay alguna regla oculta que estoy rompiendo aquí o es un error en RxJava? ¡Gracias!

PD. Si falta algún código para que puedas sacar tus conclusiones, solo pregunta y publicaré!

En RxJava, los operadores groupBy y window devuelven un observable que sólo se puede suscribir una vez y si se suscribe, reproducen su contenido acumulado al único suscriptor y pasan al modo 'hot'.

Éste era un intercambio entre devolver un observable completamente caliente y los valores faltantes del riesgo o devolver una observación ilimitada del replaying que permite a cualquier suscriptores pero retiene el contenido acumulado indefinidamente.

Se piensa que el punto medio, es decir, un único suscriptor, observable frío-caliente, es el comportamiento menos sorprendente y da al desarrollador la opción de aplicar otros operadores y elegir cualquier punto entre los dos extremos:

 source.window(1, TimeUnit.SECONDS) .map(w -> w.publish()) .doOnNext(w -> w.connect()) .subscribe(...) source.window(1, TimeUnit.SECONDS) .map(w -> w.cache()) .subscribe(...) 
  • En RxJava, cómo reintentar / reanudar en caso de error, en lugar de completar el observable
  • Android - RxJava vs AsyncTask para evitar la pérdida de memoria de getActivity ()
  • Cómo anular los Observables compartidos, infinitos con un retraso después de que el último suscriptor no se haya suscrito
  • RxJava, buen caso de uso de flatmap
  • RxJava como bus de eventos en proyecto Android - eliminar evento del bus
  • Android Studio que agrega la biblioteca de rxjava
  • Cancelar la suscripción de un rx.Single en RxJava
  • Netflix vs ReactiveX RxJava-Biblioteca de Android
  • Filtrar lista de objetos en Rxjava
  • Solo observable con varios suscriptores
  • RxJava con presentador y fragmento retenido para cambios de configuración
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.