2016-06-16 3 views
1

Ich bin neu in RX Java, so dass ich wahrscheinlich etwas sehr Grundlegendes vermisse. In dem Codebeispiel unten, was ich würde wie geschehen ist:RX Java Run Observablen auf mehrere verschiedene Threads

  1. SampleController empfängt Anfrage auf http-nio Thread

  2. CompositeService Verarbeitung läuft auf einem neuen Thread A und gibt die nio Anfrage Faden.

  3. CompositeService ruft Helloservice, die auf Gewinde B. ein Netzwerkanruf macht

  4. CompositeService ruft World die auf Gewinde C. ein Netzwerk Anruf tätigt

3 und 4 gleichzeitig laufen, und wenn das Ergebnisse sind bereit, wir verwenden die Ergebnisse, um einen Netzwerkaufruf auf Thread A.

Stattdessen was ich sehe, ist, dass 3 und 4 nacheinander auf dem http-Nio-Thread ausführen, und nur CompositeService ausführen s auf einem neuen Thread. Es scheint, dass meine subscribeOn Anrufe in 3 und 4 keine Wirkung haben. Wie bekomme ich 3 und 4 gleichzeitig laufen?

SampleController:

@RestController 
@RequestMapping("/rx-java-sample") 
public class SampleController { 

    private static Logger log = LoggerFactory.getLogger(SampleController.class); 

    @Autowired 
    private CompositeService compositeService; 

    @RequestMapping(method = RequestMethod.GET, 
     produces = { MediaType.APPLICATION_JSON_VALUE }) 
    public DeferredResult<String> getCompositeString() 
        throws ApiGatewayException, ApiValidationException { 
     log.info("Received getCompositeObject request"); 

     Observable<String> compositeObject = compositeService.getCompositeString(); 

     return toDeferredResult(compositeObject); 
    } 

    private DeferredResult<String> toDeferredResult(Observable<String> compositeObject) { 
     DeferredResult<String> result = new DeferredResult<String>(); 

     compositeObject.subscribeOn(Schedulers.newThread()).subscribe(new Observer<String>() { 
      @Override 
      public void onCompleted() { 
      } 

      @Override 
      public void onError(Throwable throwable) { 
       result.setErrorResult(throwable); 
      } 

      @Override 
      public void onNext(String compositeString) { 
       log.info("Returning compositeObject: " + compositeString); 
       result.setResult(compositeString); 
      } 
     }); 

     return result; 
    } 
} 

Hello:

@Service 
public class HelloService { 

    private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getHello() { 
     log.trace("calling get hello"); 
     return Observable.just(makeNetworkCall()); 
    } 

    private String makeNetworkCall() { 
     log.trace("making hello network call"); 
     return "hello"; 
    } 
} 

World:

@Service 
public class WorldService { 

private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getWorld() { 
     log.trace("calling get world"); 
     return Observable.just(makeNetworkCall()); 
    } 

    private String makeNetworkCall() { 
     log.trace("making world network call"); 
     return "world"; 
    } 
} 

CompositeService:

@Service 
public class CompositeService { 

    private Logger log = LoggerFactory.getLogger(CompositeService.class); 

    @Autowired 
    private HelloService helloService; 

    @Autowired 
    private WorldService worldService; 

    public Observable<String> getCompositeString() { 
     log.trace("Calling getCompositeObject"); 

     Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread()); 
     Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread()); 

     return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f,b)); 
    } 

    private String makeNetworkCall(String hello, String world) { 
     log.trace("making composite network call"); 
     return hello + " " + world; 
    } 
} 

log:

2016-06-16 07:10:13 INFO [http-nio-9050-exec-1] [SampleController.java:32] Received getCompositeObject request 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [CompositeService.java:23] Calling getCompositeObject 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:15] calling get hello 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [HelloService.java:20] making hello network call 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:15] calling get world 
2016-06-16 07:10:13 TRACE [http-nio-9050-exec-1] [WorldService.java:20] making world network call 
2016-06-16 07:10:13 TRACE [RxNewThreadScheduler-3] [CompositeService.java:32] making composite network call 
2016-06-16 07:10:13 INFO [RxNewThreadScheduler-3] [SampleController.java:54] Returning compositeObject: hello world 
+0

fyi, gibt es Schedulers.computation() für cpu gebundene Arbeit und Schedulers.io() für io gebunden - oft bessere Kandidaten als Schedulers.newThread(). –

Antwort

1

Sie wollen Observable::defer für diese Fälle verwenden:

public Observable<String> getWorld() { 
     log.trace("calling get world"); 

     return Observable.defer(() -> makeNetworkCall()); 
    } 

Dies stellt sicher, dass der Code auf jeder Zeit, dass die beobachtbaren abonniert genannt wird, wird auch.

Auch würde ich empfehlen, Schedulers.io(); Es ist ein konfigurierbarer Thread-Pool, der standardmäßig nach Bedarf erweitert wird.

0

Es scheint, das Problem meine Verwendung von Observable.just statt Observable.fromCallable war. Das Ändern des Dienstcodes von oben wie unten ergab das von mir gesuchte Verhalten. Ich bin immer noch auf der Suche nach Feedback, ob dies der "empfohlene" Weg ist. Insbesondere bin ich mir nicht sicher, ob die Verwendung von toBlocking() in CompositeService korrekt ist. Ich werde dieses Muster wahrscheinlich ausführlich in meinem Code verwenden und ich möchte es richtig machen.

Hello:

@Service 
public class HelloService { 

    private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getHello() { 
     log.trace("calling get hello"); 

     return Observable.fromCallable(() -> { 
      return makeNetworkCall(); 
     }); 
    } 

    private String makeNetworkCall() { 
     log.trace("making hello network call"); 
     return "hello"; 
    } 
} 

World:

@Service 
public class WorldService { 

private Logger log = LoggerFactory.getLogger(HelloService.class); 

    public Observable<String> getWorld() { 
     log.trace("calling get world"); 

     return Observable.fromCallable(() -> { 
      return makeNetworkCall(); 
     }); 
    } 

    private String makeNetworkCall() { 
     log.trace("making world network call"); 
     return "world"; 
    } 
} 

CompositeService:

@Service 
public class CompositeService { 

    private Logger log = LoggerFactory.getLogger(CompositeService.class); 

    @Autowired 
    private HelloService helloService; 

    @Autowired 
    private WorldService worldService; 

    public Observable<String> getCompositeString() { 
     return Observable.fromCallable(() -> { 
      return getCompositeStringImpl().toBlocking().single(); 
     }); 
    } 

    public Observable<String> getCompositeStringImpl() { 
     log.trace("Calling getCompositeObject"); 

     Observable<String> foo = helloService.getHello().subscribeOn(Schedulers.newThread()); 
     Observable<String> bar = worldService.getWorld().subscribeOn(Schedulers.newThread()); 

     return Observable.zip(foo, bar, (f, b) -> makeNetworkCall(f, b)); 
    } 

    private String makeNetworkCall(String hello, String world) { 
     log.trace("making composite network call"); 
     return hello + " " + world; 
    } 
} 

Protokolle:

2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [CompositeService.java:29] Calling getCompositeObject 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [HelloService.java:15] calling get hello 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-1] [WorldService.java:15] calling get world 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-2] [HelloService.java:23] making hello network call 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [WorldService.java:23] making world network call 
2016-06-16 08:15:50 TRACE [RxNewThreadScheduler-3] [CompositeService.java:38] making composite network call 
2016-06-16 08:15:50 INFO [RxNewThreadScheduler-1] [SampleController.java:54] Returning compositeObject: hello world