Solo observable con varios suscriptores
Tengo un Observable<<List<Foo>> getFoo()
que se crea a partir de un Servicio de Retrofit y después de llamar al método .getFoo()
, necesito compartirlo con Multiple Subscribers. .share()
embargo, al llamar al método .share()
, la llamada de red se vuelve a ejecutar. El Operador de Repetición no funciona tampoco. Sé que una posible solución podría ser. .cache()
, pero no sé por qué se produce este comportamiento.
Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); // Create an instance of our GitHub API interface. // Create a call instance for looking up Retrofit contributors. Observable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors + " -> 2"); } }); subscription1.unsubscribe(); subscription2.unsubscribe();
El código anterior puede reproducir el comportamiento antes mencionado. Puede depurarlo y ver que las listas recibidas pertenecen a un MemoryAddress diferente.
- Encadenamiento de RxJava observables con callbacks / listeners
- ¿Por qué debounce () con toList () no funciona en RxAndroid?
- ¿Por qué RxJava con Retrofit en Android doOnError () no funciona, pero el suscriptor onError no
- ¿Cómo estructurar mi aplicación usando MVP con rxjava y retrofit para obtener datos de Observables?
- Retrofit / Rxjava y servicios basados en sesiones
También he mirado ConnectableObservables como una solución potencial, pero esto me obliga a llevar el original observable alrededor, y llamando .connect()
cada vez que quiero añadir un nuevo suscriptor.
Este tipo de comportamiento con el .share()
estaba funcionando bien hasta Retrofit 1.9. Dejó de funcionar en Retrofit 2 – beta. Todavía no lo he probado con la versión de lanzamiento de Retrofit 2, que fue lanzada hace algunas horas.
EDIT: 01/02/2017
Para futuros lectores, he escrito un artículo aquí explicando más sobre el caso!
- InterruptedIOException al usar Retrofit2 con rx cuando retryOn
- .debounce () de RxJava interfiriendo con los hilos de mis observables y manejo de errores
- RxAndroid filter Observable <List <Item >>
- RxJava HashMap Transformación
- ¿Cómo crear un observable de Hashmap?
- Unidad de prueba de la aplicación android con retrofit y rxjava
- RxJava, ejecuta código en el hilo del observador antes de encadenar dos observables
- RxJava: Cómo aplicar condicionalmente a los operadores a un observable sin romper la cadena
Usted parece estar (implícitamente) lanzando su ConnectedObservable
devuelto por .share()
nuevo en un Observable
normal. Es posible que desee leer sobre la diferencia entre observables caliente y frío.
Tratar
ConnectedObservable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors + " -> 2"); } }); testObservable.connect(); subscription1.unsubscribe(); subscription2.unsubscribe();
Editar: No es necesario llamar a connect()
cada vez que desee una nueva suscripción, solo lo necesita para iniciar el observable. Supongo que podría usar replay()
para asegurarse de que todos los suscriptores posteriores obtengan todos los elementos producidos
ConnectedObservable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share() .replay()
Después de revisar con el desarrollador de RxJava Dávid Karnok Me gustaría proponer una explicación completa de lo que estaba pasando aquí.
publish().refCount()
, es decir, la fuente Observable
se transforma primero en un ConnectableObservable
por publish()
pero en lugar de tener que llamar a connect()
"manualmente" esa parte es manejada por refCount()
. En particular, refCount
llamará connect()
en el ConnectableObservable
cuando recibe la primera suscripción; Entonces, mientras haya por lo menos un suscriptor permanecerá suscrito; Y, finalmente, cuando el número de suscriptores cae a 0, se cancelará la suscripción hacia arriba. Con Observables
fríos , como los que devuelve Retrofit, esto detendrá cualquier cálculo en ejecución.
Si después de uno de estos ciclos aparece otro abonado, refCount
volverá a llamar a connect
y así activar una nueva suscripción a la fuente Observable. En este caso, activará otra solicitud de red.
Ahora, esto generalmente no se hizo evidente con Retrofit 1 (y, de hecho, cualquier versión antes de este commit ), porque estas versiones anteriores de Retrofit por defecto movido todas las solicitudes de red a otro hilo. Esto generalmente significa que todas sus llamadas de subscribe()
ocurrirían mientras la primera solicitud / Observable
todavía estaba en ejecución y por lo tanto, el nuevo Subscriber
simplemente se agregaría al refCount
y por lo tanto no refCount
solicitudes adicionales / Observables
.
Las versiones más recientes de Retrofit, sin embargo, no mueven el trabajo por defecto a otro hilo más – usted tiene que hacerlo explícitamente llamando, por ejemplo, subscribeOn(Schedulers.io())
. Si no lo hace, todo permanecerá en el subproceso actual, lo que significa que el segundo subscribe()
sólo ocurrirá después de que el primer Observable
haya llamado onCompleted
y, por lo tanto, después de que todos los Subscribers
hayan cancelado y todo se cierre. Ahora, como vimos en el primer párrafo, cuando el segundo se subscribe()
se llama, share()
no tiene otra opción que causar otra Subscription
a la fuente Observable y desencadenar otra solicitud de red.
Por lo tanto, para volver al comportamiento que está acostumbrado a Retrofit 1, sólo tiene que añadir subscribeOn(Schedulers.io())
.
Esto sólo debería dar lugar a que la solicitud de red se ejecute – la mayor parte del tiempo. En principio, sin embargo, todavía puede obtener múltiples solicitudes (y siempre podría tener con Retrofit 1), pero sólo si sus solicitudes de red son extremadamente rápido y / o subscribe()
llamadas se producen con un retraso considerable, por lo que, de nuevo, la primera La solicitud finaliza cuando ocurre la segunda subscribe()
.
Por lo tanto, Dávid sugiere utilizar cache()
(pero tiene los inconvenientes que mencionó) o replay().autoConnect()
. Según estas notas de lanzamiento , autoConnect
funciona como sólo la primera mitad de refCount
, o más precisamente, es
Similar en el comportamiento a refCount (), excepto que no se desconecta cuando los suscriptores se pierden.
Esto significa que la solicitud sólo se activaría cuando se produzca la primera subscribe()
pero luego todos los Subscriber
posteriores recibirían todos los elementos emitidos, independientemente de si hubo, en cualquier momento entre 0 suscriptores.