Solo observable con varios suscriptores

Tengo un Observable<<List<Foo>> getFoo() que se crea a partir de un Servicio de Retrofit y después de llamar al método .getFoo() , necesito compartirlo con Multiple Subscribers. .share() embargo, al llamar al método .share() , la llamada de red se vuelve a ejecutar. El Operador de Repetición no funciona tampoco. Sé que una posible solución podría ser. .cache() , pero no sé por qué se produce este comportamiento.

 Retrofit retrofit = new Retrofit.Builder() .baseUrl(API_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) .build(); // Create an instance of our GitHub API interface. // Create a call instance for looking up Retrofit contributors. Observable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors + " -> 2"); } }); subscription1.unsubscribe(); subscription2.unsubscribe(); 

El código anterior puede reproducir el comportamiento antes mencionado. Puede depurarlo y ver que las listas recibidas pertenecen a un MemoryAddress diferente.

También he mirado ConnectableObservables como una solución potencial, pero esto me obliga a llevar el original observable alrededor, y llamando .connect() cada vez que quiero añadir un nuevo suscriptor.

Este tipo de comportamiento con el .share() estaba funcionando bien hasta Retrofit 1.9. Dejó de funcionar en Retrofit 2 – beta. Todavía no lo he probado con la versión de lanzamiento de Retrofit 2, que fue lanzada hace algunas horas.

EDIT: 01/02/2017

Para futuros lectores, he escrito un artículo aquí explicando más sobre el caso!

Usted parece estar (implícitamente) lanzando su ConnectedObservable devuelto por .share() nuevo en un Observable normal. Es posible que desee leer sobre la diferencia entre observables caliente y frío.

Tratar

 ConnectedObservable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share(); Subscription subscription1 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors); } }); Subscription subscription2 = testObservable .subscribe(new Subscriber<List<Contributor>>() { @Override public void onCompleted() { } @Override public void onError(Throwable throwable) { } @Override public void onNext(List<Contributor> contributors) { System.out.println(contributors + " -> 2"); } }); testObservable.connect(); subscription1.unsubscribe(); subscription2.unsubscribe(); 

Editar: No es necesario llamar a connect() cada vez que desee una nueva suscripción, solo lo necesita para iniciar el observable. Supongo que podría usar replay() para asegurarse de que todos los suscriptores posteriores obtengan todos los elementos producidos

 ConnectedObservable<List<Contributor>> testObservable = retrofit .create(GitHub.class) .contributors("square", "retrofit") .share() .replay() 

Después de revisar con el desarrollador de RxJava Dávid Karnok Me gustaría proponer una explicación completa de lo que estaba pasando aquí.

publish().refCount() , es decir, la fuente Observable se transforma primero en un ConnectableObservable por publish() pero en lugar de tener que llamar a connect() "manualmente" esa parte es manejada por refCount() . En particular, refCount llamará connect() en el ConnectableObservable cuando recibe la primera suscripción; Entonces, mientras haya por lo menos un suscriptor permanecerá suscrito; Y, finalmente, cuando el número de suscriptores cae a 0, se cancelará la suscripción hacia arriba. Con Observables fríos , como los que devuelve Retrofit, esto detendrá cualquier cálculo en ejecución.

Si después de uno de estos ciclos aparece otro abonado, refCount volverá a llamar a connect y así activar una nueva suscripción a la fuente Observable. En este caso, activará otra solicitud de red.

Ahora, esto generalmente no se hizo evidente con Retrofit 1 (y, de hecho, cualquier versión antes de este commit ), porque estas versiones anteriores de Retrofit por defecto movido todas las solicitudes de red a otro hilo. Esto generalmente significa que todas sus llamadas de subscribe() ocurrirían mientras la primera solicitud / Observable todavía estaba en ejecución y por lo tanto, el nuevo Subscriber simplemente se agregaría al refCount y por lo tanto no refCount solicitudes adicionales / Observables .

Las versiones más recientes de Retrofit, sin embargo, no mueven el trabajo por defecto a otro hilo más – usted tiene que hacerlo explícitamente llamando, por ejemplo, subscribeOn(Schedulers.io()) . Si no lo hace, todo permanecerá en el subproceso actual, lo que significa que el segundo subscribe() sólo ocurrirá después de que el primer Observable haya llamado onCompleted y, por lo tanto, después de que todos los Subscribers hayan cancelado y todo se cierre. Ahora, como vimos en el primer párrafo, cuando el segundo se subscribe() se llama, share() no tiene otra opción que causar otra Subscription a la fuente Observable y desencadenar otra solicitud de red.

Por lo tanto, para volver al comportamiento que está acostumbrado a Retrofit 1, sólo tiene que añadir subscribeOn(Schedulers.io()) .

Esto sólo debería dar lugar a que la solicitud de red se ejecute – la mayor parte del tiempo. En principio, sin embargo, todavía puede obtener múltiples solicitudes (y siempre podría tener con Retrofit 1), pero sólo si sus solicitudes de red son extremadamente rápido y / o subscribe() llamadas se producen con un retraso considerable, por lo que, de nuevo, la primera La solicitud finaliza cuando ocurre la segunda subscribe() .

Por lo tanto, Dávid sugiere utilizar cache() (pero tiene los inconvenientes que mencionó) o replay().autoConnect() . Según estas notas de lanzamiento , autoConnect funciona como sólo la primera mitad de refCount , o más precisamente, es

Similar en el comportamiento a refCount (), excepto que no se desconecta cuando los suscriptores se pierden.

Esto significa que la solicitud sólo se activaría cuando se produzca la primera subscribe() pero luego todos los Subscriber posteriores recibirían todos los elementos emitidos, independientemente de si hubo, en cualquier momento entre 0 suscriptores.

  • No se puede convertir el objeto Subclase en cuerpo de solicitud json en Retrofit 2.0 con GsonConverterFactory
  • Realm + Retrofit + Rxjava
  • Manera eficiente de manipular los subprocesos RxJava
  • ¿Por qué necesita onBackpressure () aquí para eventos de clic?
  • Uso de "skipWhile" combinado con "repeatWhen" en RxJava para implementar el sondeo de servidor
  • Habilitación del botón cuando EditText tiene texto (RxAndroid)
  • Cómo manejar diferentes tipos de errores en Retrofit Rx onError sin instancia fea
  • RxJava se vuelve a suscribir al evento tras la restauración de la actividad
  • Cómo encadenar los métodos groupBy () de RxJava como groupBy (). GroupBy ()
  • Rx java retrofit 2 manejo de errores
  • Schedulers.immediate () no funciona con las pruebas de gradle de la línea de comandos
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.