2016-07-06 7 views
4

Ich habe eine Schlafmethode, um einen lang laufenden Prozess zu simulieren.Wie kann ich diesen rxjava zip parallel laufen lassen?

private void sleep() { 
    try { 
     Thread.sleep(2000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

Dann habe ich eine Methode gibt eine Observable mit einer Liste von 2 Zeichenfolgen, die in den Parametern angegeben ist. Er ruft den Schlaf auf, bevor er die Saiten zurückbringt.

private Observable<List<String>> getStrings(final String str1, final String str2) { 
    return Observable.fromCallable(new Callable<List<String>>() { 
     @Override 
     public List<String> call() { 
      sleep(); 
      List<String> strings = new ArrayList<>(); 
      strings.add(str1); 
      strings.add(str2); 
      return strings; 
     } 
    }); 
} 

Dann bin ich die getStrings dreimal in Observalb.zip Aufruf erwarte ich, dass diese drei Anrufe parallel laufen zu lassen, so sollte die gesamte Zeit der Ausführung sein, innerhalb von 2 Sekunden oder vielleicht 3 Sekunden am meisten, weil der Schlaf war nur 2 Sekunden. Es dauert jedoch insgesamt sechs Sekunden. Wie kann ich das parallel machen, damit es innerhalb von 2 Sekunden fertig ist?

Observable 
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists()) 
.subscribeOn(Schedulers.io()) 
.observeOn(AndroidSchedulers.mainThread()) 
.subscribe(new Observer<List<String>>() { 
    @Override 
    public void onCompleted() { 

    } 

    @Override 
    public void onError(Throwable e) { 

    } 

    @Override 
    public void onNext(List<String> strings) { 
     //Display the strings 
    } 
}); 

Die mergeStringLists Methode

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() { 
    return new Func3<List<String>, List<String>, List<String>, List<String>>() { 
     @Override 
     public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) { 
      Log.d(TAG, "..."); 

      for (String s : strings2) { 
       strings.add(s); 
      } 

      for (String s : strings3) { 
       strings.add(s); 
      } 

      return strings; 
     } 
    }; 
} 

Antwort

7

Das geschieht, weil geschieht in dem die gleichen, io Thread zu Ihrem zipped beobachtbaren abonnieren möchte.

Warum Sie dies nicht stattdessen versuchen:

Observable 
    .zip(
     getStrings("One", "Two") 
      .subscribeOn(Schedulers.newThread()), 
     getStrings("Three", "Four") 
      .subscribeOn(Schedulers.newThread()), 
     getStrings("Five", "Six") 
      .subscribeOn(Schedulers.newThread()), 
     mergeStringLists()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    .subscribe(new Observer<List<String>>() { 
     @Override 
     public void onCompleted() { 

     } 

     @Override 
     public void onError(Throwable e) { 

     } 

     @Override 
     public void onNext(List<String> strings) { 
      //Display the strings 
     } 
    }); 

Lassen Sie mich wissen, ob das

+1

' Schedulers.io() zu sehen ist Standardmäßig ein Thread-Pool, der nach Bedarf wächst. –

+0

@TassosBassoukos meinst du, dass Schedulers.io() neue Threads automatisch nach Bedarf erstellt? –

+0

@Bartek Ihre Lösung hat funktioniert, wissen Sie, ob es andere Lösungen als Ihre gibt, damit die Zip parallel läuft? –

1

Hier half ich habe ein Beispiel, das ich Zip in asynchroner Weise hat verwenden, nur für den Fall du bist neugierig

  /** 
      * Since every observable into the zip is created to     subscribeOn a diferent thread, it´s means all of them will run in parallel. 
      * By default Rx is not async, only if you explicitly use subscribeOn. 
       */ 
      @Test 
      public void testAsyncZip() { 
       scheduler = Schedulers.newThread(); 
       scheduler1 = Schedulers.newThread(); 
       scheduler2 = Schedulers.newThread(); 
       long start = System.currentTimeMillis(); 
       Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                        .concat(s3)) 
      .subscribe(result -> showResult("Async in:", start, result)); 
      } 



     public Observable<String> obAsyncString() { 
return Observable.just("") 
       .observeOn(scheduler) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> "Hello"); 
     } 

     public Observable<String> obAsyncString1() { 
return Observable.just("") 
       .observeOn(scheduler1) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> " World"); 
     } 

     public Observable<String> obAsyncString2() { 
return Observable.just("") 
       .observeOn(scheduler2) 
       .doOnNext(val -> { 
        System.out.println("Thread " + Thread.currentThread() 
                  .getName()); 
       }) 
       .map(val -> "!"); 
     } 

Sie hier weitere Beispiele `https://github.com/politrons/reactive

+0

bitte formatieren Sie den Code –