Comportamiento de onNext y onComplete

Tengo un Observable que hace algo sin la necesidad de emitir un valor. También tengo una lista de objetos que quiero que el Observable para trabajar. Así que para todos los elementos de esta lista: doSomething ()

Observable.from(uris) .flatMap(new Func1<Uri, Observable<Void>>() { @Override public Observable<Void> call(Uri uri) { return createDoSomethingObservable(uri); } }) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Observer<Void>() { @Override public void onCompleted() { Log.d(TAG, "completed"); } @Override public void onError(Throwable e) { } @Override public void onNext(Void aVoid) { Log.d(TAG, "next"); } }); 

Y el método que crea el Observable:

 Observable<Void> createDoSomethingObservable(final Uri uri) { return Observable.create(new Observable.OnSubscribe<Void>() { @Override public void call(Subscriber<? super Void> subscriber) { //doSomething subscriber.onNext(null); subscriber.onCompleted(); } }); } 

Ahora, cuando ejecuto esto con una lista con 3 elementos obtengo:

 next next next completed 

Que es bueno, porque eso es lo que quería, pero no sé por qué está funcionando. Primero empecé a llamar a onComplete, porque al final el observable hace su trabajo y termina. Pero luego, por supuesto, onNext nunca se llama al suscriptor. Lo mismo ocurre con la otra forma.

Así que mis preguntas son:

  1. ¿Por qué onComplete solo llama al último elemento de lista?
  2. ¿Hay una mejor manera de resolver esto?

onComplete se llama para el último elemento porque eso es cuando el más temprano observable en la cadena ( from(uris) ) ha terminado.

Se espera que sus observables emitidos por flatMap llamen a onComplete . Una vez que se hace (y la call ha regresado), entonces la próxima emisión de from se puede trabajar. Una vez que ha terminado de emitir observables, llama a onComplete y la cadena está terminada, efectivamente.

Creo que ese pequeño código te ayuda a entender el comportamiento de onNext( ) y onComplete() .
Supongamos que tiene una List<Uri> . Vamos a transformarlo a Observable<Uri> manualmente .

 public static Observable<Uri> getUries(List<Uri> uriList){ return Observable.create(new Observable.OnSubscribe<Uri>() { @Override public void call(Subscriber<? super Uri> subscriber) { for(Uri uri : uriList){ subscriber.onNext(uri); } subscriber.onCompleted(); } }); } 

O usar expresiones Lambda:

 public static Observable<Uri> getUries(List<Uri> uriList){ return Observable.create(subscriber -> { for(Uri uri : uriList){ subscriber.onNext(uri); } subscriber.onCompleted(); }); } 

Como se puede ver, estamos iterando la lista de entrada, y llamamos onNext( ) para cada elemento, y cuando terminamos de transformar nuestra List a Observable , llamamos onComplete()

PS Este código sólo una demostración, por favor, nunca la utilice para transformar la List a Observable . Utilice el operador Observable.from() para ello.

ACTUALIZACIÓN :

Operador from( ) implementación:

 ... while (true) { if (o.isUnsubscribed()) { return; } else if (it.hasNext()) { o.onNext(it.next()); } else if (!o.isUnsubscribed()) { o.onCompleted(); return; } else { // is unsubscribed return; } } ... 

Enlace: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java#L75-L87

onComplete (igual que onError ) se llama sólo una vez durante la cadena observable, es la forma en que rxjava se implementa

Pienso que su acercamiento es correcto, así que la mejor manera no es necesaria.

  • Uso de "skipWhile" combinado con "repeatWhen" en RxJava para implementar el sondeo de servidor
  • ¿Cómo estructurar mi aplicación usando MVP con rxjava y retrofit para obtener datos de Observables?
  • Prueba de desplazamiento sin fin RecyclerView con Espresso y RxJava
  • ¿Cómo ejecutar 2 consultas secuencialmente en un Android RxJava Observable?
  • Usando observeOn () el hilo de interfaz de usuario de Android bloquea mi prueba de emulador
  • Android - RxJava vs AsyncTask para evitar la pérdida de memoria de getActivity ()
  • Habilitación del botón cuando EditText tiene texto (RxAndroid)
  • ¿Por qué Rxjava podría causar fugas de memoria?
  • ¿Qué está causando HTTP FALLA: java.net.SocketException: Socket cerrado?
  • RXJava - ejecuta múltiples observables tras otro (como concat, pero con onCompleted para cada observable)
  • Hilo predeterminado de actualización
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.