Realizar N llamadas de api secuenciales usando RxJava y Retrofit

Tengo una lista de archivos que me gustaría subir al backend desde un dispositivo Android. Debido a restricciones de memoria, me gustaría hacer la segunda llamada de API sólo después de la primera terminó, la tercera después de la segunda terminó, y así sucesivamente.

Escribí algo así como

private Observable<Integer> uploadFiles(List<File> files) { return Observable.create(subscriber -> { for (int i = 0, size = files.size(); i < size; i++) { UploadModel uploadModel = new UploadModel(files.get(0)); int uploadResult = retrofitApi.uploadSynchronously(uploadModel); subscriber.onNext(uploadResult); } subscriber.onCompleted(); }).subscribeOn(Schedulers.newThread()); } 

Pero siento que esto podría ir en contra del espíritu de Rx, y el dicho es que si estás usando Observable.create, probablemente lo estás haciendo mal … ¿Es esto un enfoque razonable? ¿Existe una mejor manera de lograr esto con la integración RxJava de Retrofit?

Naively, yo haría eso (no funciona, sin embargo, ver más abajo):

 return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel)); 

Ahora el problema es que no hay manera de decir retrofit para usar sólo un hilo para esas llamadas.

reduce , sin embargo, pasa el resultado de una llamada de función a la siguiente, junto con el siguiente valor emitido de la observable original. Eso funcionaría, pero la función que se pasa para reduce necesita ser sincrónica. No está bien.

Otro enfoque sería modificar el observable recursivamente:

 void getNextFile(int i) { return retrofit.upload(i). onNext(result -> getNextFile(i + 1)); } 

aproximadamente. Pero no estoy seguro de cómo limpiarlo para hacerlo más legible.

El más limpio que creo que sería algo así como:

 Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file))); 

Los nativos de RxJava emitirían todos los elementos de Observable.from(...) como si estuvieran en paralelo. Esa es la mejor manera de pensar en ella como una emisión paralela. Sin embargo, algunos casos requieren una ejecución consecuente real de toda la cadena. He llegado a la siguiente solución, probablemente no el mejor, pero el trabajo.

 import rx.Observable; import rx.Subscriber; import java.util.Iterator; import java.util.function.Function; public class Rx { public static void ignore(Object arg) { } public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) { return Observable.create(collectorSubscriber -> Observable.<Void>create(producerSubscriber -> producerSubscriber.setProducer(ignoredCount -> { if (!iterator.hasNext()) { producerSubscriber.onCompleted(); return; } E model = iterator.next(); action.apply(model) .subscribe( Rx::ignore, producerSubscriber::onError, () -> producerSubscriber.onNext(null)); })) .subscribe(new Subscriber<Void>() { @Override public void onStart() { request(1); } @Override public void onCompleted() { collectorSubscriber.onNext(null); collectorSubscriber.onCompleted(); } @Override public void onError(Throwable e) { collectorSubscriber.onError(e); } @Override public void onNext(Void aVoid) { request(1); } })); } } 

Ejemplo de uso sería:

  Iterator<? extends Model> iterator = models.iterator(); Rx.sequential(iterator, model -> someFunctionReturnsObservable(model)) .subscribe(...); 

Este método garantiza ejecuciones encadenadas de

Observable<Dummy> someFunctionReturnsObservable(Model model)

Actualmente, la forma preferida de crear observables es con fromAsync:

 Observable.fromAsync(new Action1<AsyncEmitter<Object>>() { @Override public void call(final AsyncEmitter<Object> emitter) { emitter.onNext(object); emitter.onCompleted(); emitter.setCancellation(new AsyncEmitter.Cancellable() { @Override public void cancel() throws Exception { // on unSubscribe() callback } }); } }, AsyncEmitter.BackpressureMode.BUFFER); 
  • RxJava alternativa para el operador map () para guardar los elementos emitidos
  • ¿Existe algún patrón de desarrollo que pueda reemplazar un IntentService para las solicitudes de red?
  • RxJava / RxAndroid - maneja varios cambios de EditText
  • Creando Observable sin utilizar Observable.create
  • Rx 2 Android ¿qué es mejor Single o Observable para llamadas api?
  • ¿Por qué debounce () con toList () no funciona en RxAndroid?
  • Retrofit / Rxjava y servicios basados ​​en sesiones
  • ¿Por qué RxJava con Retrofit en Android doOnError () no funciona, pero el suscriptor onError no
  • RxJava: ¿Cómo puedo hacer una observación de búsqueda-una vez-y-reutilización?
  • Encadenar dos observables de adaptación con RxJava
  • Manejador de errores por defecto de RxJava
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.