Utilizar RxJava para encadenar la solicitud en un solo hilo

Estoy guardando la ubicación del usuario en la base de datos local de la aplicación y luego la envío al servidor. Una vez que el servidor devuelve un éxito, elimino la ubicación que se envió.

Cada vez que un punto se ha guardado en la base de datos llamo a este método:

public void sendPoint(){ amazonRetrofit.postAmazonPoints(databaseHelper.getPoints()) .map(listIdsSent -> deleteDatabasePoints(listIdsSent)) .doOnCompleted(() -> emitStoreChange(finalEvent)) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(AndroidSchedulers.from(backgroundLooper)) .subscribe(); } 
  1. Consultar la base de datos para el punto que se enviará al servidor
  2. Recibí del servidor la lista de puntos enviados correctamente
  3. Usando .map() , recojo el punto enviado con éxito y los suprimo de la base de datos local

A veces, sucede que llamo este método repetidamente sin tener que esperar para que la petición anterior sea completada y suprimido el punto enviado. Por lo tanto, cuando vuelva a llamar a ese método, se publicará el mismo punto que la solicitud anterior, ya que la solicitud anterior no se ha completado aún así no han borrado el punto utilizando el .map() todavía. Causa del servidor para recibir duplicados …

Cronograma


  • Primera llamada a postPoint()
  • Recuperar punto A, B, C de la base de datos
  • Coloque los puntos A, B, C en el servidor
  • Segunda llamada a postPoint()
  • Recuperar punto A, B, C, D de la base de datos
  • Coloque los puntos A, B, C, D en el servidor
  • Recibe el éxito de la primera solicitud
  • Borrar A, B, C de la base de datos local
  • Recibe el éxito de la segunda solicitud
  • Borrar A, B, C, D de la base de datos local

Resultado:

La base de datos del servidor ahora ha recibido: A, B, C, A, B, C, D

Cada solicitud se produce secuencialmente pero de alguna manera los mismos puntos de ubicación se envían al servidor cuando llamo a sendPoint() demasiado rápido. ¿Cómo puedo arreglar esto?

En primer lugar a todo lo que no está utilizando operador observerOn correctamente, el operador observerOn se aplica sobre los pasos en su tubería, una vez que se define. Por lo tanto, si define al final de la canalización justo antes de subscribeOn, ninguno de sus pasos anteriores se ejecutará en ese subproceso.

Además, dado que necesita esperar hasta la respuesta de su llamada al servidor, puede utilizar los manejadores de devoluciones de llamada que el suscriptor ya proporciona (onNext (), onComplete ())

  public void sendPoint(){ Observable.from(databaseHelper.getPoints()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(poins-> amazonRetrofit.postAmazonPoints(points)) .subscribeOn(AndroidSchedulers.from(backgroundLooper)) .subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent)); } 

si desea ver más ejemplos de ObserverOn y SubscribeOn puede echar un vistazo aquí. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

Debe tener algún tipo de validación en el lado del cliente o / y en el lado del backend.

Lado del cliente:
La solución más sencilla es agregar dos columnas a la tabla con ubicaciones como "procesamiento" y "subido". Cuando selecciona ubicaciones de base de datos y clausure where processing=false and uploaded=false . Entonces, cuando usted tiene filas listas para enviar set processing=true y cuando el servidor devuelve el éxito set done=true .

Parte posterior (opcional, depende de los requisitos):
Debe enviar la ubicación con la marca de tiempo al servidor (probablemente una columna más adicional en la tabla del lado del cliente). Si el servidor obtiene una ubicación con una marca de tiempo más antigua que la última en una base de datos, no debería almacenarla.

Solución RxJava:
Puede implementar una solución similar con caché de memoria que se mantiene alrededor de todo el sendPoint como List .

Pseudocódigo:

 public void sendPoint(){ databaseHelper.getPoints() .filter(points -> pointsNotInCache()) .map(points -> amazonRetrofit.postAmazonPoints()) .map(points -> addToCache()) .map(listIdsSent -> deleteDatabasePoints(listIdsSent)) .map(listIdsSent -> removeSentPointsFromCache()) //if you would like save memory .doOnCompleted(() -> emitStoreChange(finalEvent)) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(AndroidSchedulers.from(backgroundLooper)) .subscribe(); } 

Parece que, como todo el mundo está diciendo, se necesita un caché intermedio.

es decir

 HashSet<Point> mHashSet = new HashSet<>(); public void sendPoint() { Observable.from(databaseHelper.getPoints()) .filter(point -> !mHashSet.contains(point)) .doOnNext(mHashSet::put) .toList() .flatMap(amazonRetrofit::postAmazonPoints) .map(this::deleteDatabasePoints) .doOnCompleted(() -> emitStoreChange(finalEvent)) .observeOn(AndroidSchedulers.mainThread()) .subscribeOn(AndroidSchedulers.from(backgroundLooper)) .subscribe(); } 
  • ¿Cómo ejecutaría esta sentencia usando RxJava?
  • RxAndroid: ¿cuándo usar bindActivity y por qué?
  • Encadenamiento de servicios de Retrofit con soporte de RxJava
  • Patrón de repositorio con SqlBrite / SqlDelight (base de datos sin conexión) y Retrofit (solicitud Http)
  • RxJava, buen caso de uso de flatmap
  • ¿Es posible obtener 2 valores en onNext () de suscriptor en rxjava android?
  • Prueba de desplazamiento sin fin RecyclerView con Espresso y RxJava
  • Prueba de Android Realm con RxJava - "abierto desde un hilo sin un Looper" Excepción
  • Rx java retrofit 2 manejo de errores
  • Cómo utilizar rxandroid para escuchar la actualización de ubicación de gps
  • Arquitectura de la aplicación de Android: RxJava
  • FlipAndroid es un fan de Google para Android, Todo sobre Android Phones, Android Wear, Android Dev y Aplicaciones para Android Aplicaciones.