2016-04-11 7 views
12

Ich entwickle Anwendung für Android mit Hintergrunddatensynchronisation. Ich verwende derzeit RxJava, um einige Daten auf dem Server in regelmäßigen Abständen zu veröffentlichen. Ansonsten möchte ich dem Benutzer eine Schaltfläche "force sync" zur Verfügung stellen, die die Synchronisierung sofort auslöst. Ich weiß, wie man Observable.interval() verwendet, um Daten in regelmäßigen Zeitintervallen zu pushen, und ich würde Observalbe.just() verwenden, um das zu drücken, das gezwungen wird, aber ich möchte sie in Warteschlange stellen, wenn so geschieht, dass man während des vorherigen noch ausgelöst wird läuft.Queuing Aufgaben mit RxJava in Android

Also nehmen wir ein Beispiel, wenn 1min Intervall der automatischen Synchronisierung ist, und sagen wir, dass die Synchronisierung 40sec dauert (ich übertreibe hier nur, um leichter Punkt zu machen). Wenn nun zufällig der Benutzer die "force" Taste drückt, während die Automatik noch läuft (oder umgekehrt - die automatische Triggerung, wenn die erzwungene noch läuft), würde ich gerne die zweite Sync-Anfrage in die Warteschlange stellen der erste endet.

habe ich dieses Bild ziehen, die etwas mehr Perspektive setzen kann:

enter image description here

Wie Sie sehen können, wird die automatische ausgelöst (von einigen Observable.interval()), und in der Mitte des Syncing, Benutzer drückt "force" -Taste. Jetzt wollen wir warten, bis die erste Anfrage beendet ist, und dann erneut für die erzwungene Anfrage beginnen. An einem Punkt, während die erzwungene Anfrage ausgeführt wurde, wurde die neue automatische Anfrage erneut ausgelöst, die es gerade in die Warteschlange aufgenommen hat. Nachdem der letzte aus der Warteschlange beendet wurde, hört alles auf, und dann wurde die Automatik später noch einmal eingeplant.

Hoffe jemand kann mich zeigen, um Operator zu korrigieren, wie dies zu tun ist. Ich habe versucht, mit Observable.combineLatest(), aber die Warteschlange-Liste wurde am Anfang ausgelöst und wenn ich neue Synchronisierung in die Warteschlange hinzugefügt wurde nicht fortgesetzt, wenn der vorherige Vorgang abgeschlossen wurde.

Jede Hilfe ist sehr geschätzt, Darko

Antwort

11

Sie diese mit der Taste Observable/Subject klicken Sie durch die Zusammenlegung der Timer tun können, die Warteschlangen Wirkung von onBackpressureBuffer und concatMap die Verarbeitung in sie, die, die man macht sicher verwenden läuft zu einer Zeit.

PublishSubject<Long> subject = PublishSubject.create(); 

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS); 

periodic.mergeWith(subject) 
.onBackpressureBuffer() 
.concatMap(new Func1<Long, Observable<Integer>>() { 
    @Override 
    public Observable<Integer> call(Long v) { 
     // simulates the task to run 
     return Observable.just(1) 
       .delay(300, TimeUnit.MILLISECONDS); 
    } 
} 
).subscribe(System.out::println, Throwable::printStackTrace); 

Thread.sleep(1100); 
// user clicks a button 
subject.onNext(-1L); 

Thread.sleep(800); 
+0

Ok, das ist eine gute Richtung, aber ich habe ein paar Fragen ... Was ist los mit der anonymen Funktion? Ich benutze nicht Java 8, weil ich für API-Niveau 16 entwickeln muss. Auch welche Linie simulieren Knopfdruck? –

+0

Das Lambda gibt an, wo die Synchronisierung stattfinden soll - in einer beobachtbaren Form. In der Datei subject.onNext() klickt der Benutzer auf eine Schaltfläche. – akarnokd

+0

Hinzufügen einer weiteren Frage, wenn Sie mir helfen können. Also habe ich dies als [@akarnokd] vorgeschlagen (http://stackoverflow.com/users/61158/akarnokd) vorgeschlagen und alles funktioniert gut, genau was ich brauche. Jetzt versuche ich 'onComplete()' aufzurufen, nachdem die Kette fertig ist. Wenn Sie sich das Bild oben ansehen, würde die erste Kette nach dem 3. Synchronisierungsprozess enden. Das liegt daran, dass ich vielleicht die Benutzeroberfläche nach dem Ende der Kette aktualisieren möchte, anstatt nach jeder Synchronisierung. –

3

Zwar gibt es eine akzeptierte Antwort mit einer guten Lösung würde Ich mag eine andere Option teilen diese mit Scheduler und SingleThreadExecutor zu tun

public static void main(String[] args) throws Exception { 
    System.out.println(" init "); 
    Observable<Long> numberObservable = 
      Observable.interval(700, TimeUnit.MILLISECONDS).take(10); 

    final Subject subject = PublishSubject.create(); 

    Executor executor = Executors.newSingleThreadExecutor(); 
    Scheduler scheduler = Schedulers.from(executor); 
    numberObservable.observeOn(scheduler).subscribe(subject); 

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), 
        onCompleteFunc("subscriber 1")); 

    Thread.sleep(800); 
    //simulate action 
    executor.execute(new Runnable() { 
     @Override 
     public void run() { 
      subject.onNext(333l); 
     } 
    }); 

    Thread.sleep(5000); 
} 

static Action1<Long> onNextFunc(final String who) { 
    return new Action1<Long>() { 
     public void call(Long x) { 
      System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName() 
        + " -- " + System.currentTimeMillis()); 
      try { 
       //simulate some work 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }; 
} 

static Action1<Throwable> onErrorFunc(final String who) { 
    return new Action1<Throwable>() { 
     public void call(Throwable t) { 
      t.printStackTrace(); 
     } 
    }; 
} 

static Action0 onCompleteFunc(final String who) { 
    return new Action0() { 
     public void call() { 
      System.out.println(who + " complete"); 
     } 
    }; 
}