5

Ich habe gerade mit der neuen reaktionsfähigen Spring 5-Unterstützung begonnen und wollte einige asynchrone Datengenerierung simulieren, wobei zwei fehlerhafte Verhaltensweisen festgestellt wurden:Spring 5 Reactive schlägt fehl beim Erweitern von Flux/Implementieren von Publisher und mehrmaligem Aufrufen von s.onNext()

1) Aufruf s.onNext (String) mehr als einmal:

@GetMapping("/strings") 
public Publisher<String> getStrings(){ 

    return new Publisher<String>() { 

     @Override 
     public void subscribe(Subscriber<? super String> s) { 
      int i = 0; 
      while(++i <= 5){ 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       s.onNext("message"); 
      } 
      s.onComplete(); 
     } 
    }; 
} 

In diesem Fall wird die Stacktrace ist:

2016-08-03 13:35:04.986 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /strings 
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<java.lang.String> com.codependent.spring5.playground.reactive.web.AccountsController.getStrings()] 
2016-08-03 13:35:04.994 DEBUG 5136 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsController' 
2016-08-03 13:35:07.120 DEBUG 5136 --- [nio-8080-exec-1] o.s.w.s.h.ExceptionHandlingWebHandler : Could not complete request 

java.lang.IllegalStateException: RECEIVED 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) [classes/:na] 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) [spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) [reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) [spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] 

2016-08-03 13:35:07.121 DEBUG 5136 --- [nio-8080-exec-1] o.s.h.s.r.ServletServerHttpResponse  : Can't set the status 500 because the HTTP response has already been committed 
2016-08-03 13:35:08.127 ERROR 5136 --- [nio-8080-exec-1] a.c.c.C.[.[.0.0.0.[.[httpHandlerServlet] : Servlet.service() for servlet [httpHandlerServlet] in context with path [] threw exception 

reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: RECEIVED 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onError(MonoThenApply.java:209) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onError(FluxResume.java:105) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.OperatorAdapter.doOnSubscriberError(OperatorAdapter.java:113) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:91) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at com.codependent.spring5.playground.reactive.web.AccountsController$4.subscribe(AccountsController.java:107) ~[classes/:na] 
    at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:59) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:73) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ChannelSendOperator.subscribe(ChannelSendOperator.java:54) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$DeferredScalarSubscriber.complete(Operators.java:797) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager$SecondSubscriber.onNext(MonoThenApply.java:203) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:130) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1293) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:186) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:100) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:51) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:69) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply$MonoThenApplyManager.onNext(MonoThenApply.java:133) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:71) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:383) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:192) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:96) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:60) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:116) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:45) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwiseIfEmpty.subscribe(MonoOtherwiseIfEmpty.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenApply.subscribe(MonoThenApply.java:58) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoOtherwise.subscribe(MonoOtherwise.java:47) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply$MonoConcatIgnoreManager.drain(MonoThenSupply.java:167) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at reactor.core.publisher.MonoThenSupply.subscribe(MonoThenSupply.java:55) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    at org.springframework.http.server.reactive.ServletHttpHandlerAdapter.service(ServletHttpHandlerAdapter.java:93) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:230) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:108) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:522) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:349) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1110) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:785) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1425) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_45] 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_45] 
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.4.jar:8.5.4] 
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_45] 
Caused by: java.lang.IllegalStateException: RECEIVED 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor$State.onNext(AbstractResponseBodyProcessor.java:316) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:77) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.AbstractResponseBodyProcessor.onNext(AbstractResponseBodyProcessor.java:47) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at org.springframework.http.server.reactive.ChannelSendOperator$WriteWithBarrier.doNext(ChannelSendOperator.java:97) ~[spring-web-5.0.0.M1.jar:5.0.0.M1] 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) ~[reactor-core-3.0.0.BUILD-SNAPSHOT.jar:na] 
    ... 57 common frames omitted 

2) Aufruf s.onNext (Alert.class -alle DTO-) mehr als einmal:

@GetMapping("/alerts") 
public Publisher<Alert> getAlerts(){ 

    return new Publisher<Alert>() { 

     @Override 
     public void subscribe(Subscriber<? super Alert> s) { 
      int i = 0; 
      while(++i <= 5){ 
       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       s.onNext(new Alert((long)1, "ms")); 
      } 
      s.onComplete(); 
     } 
    }; 
} 

Jetzt zeigt es nicht einen Fehler auf die Protokolle aber der Anrufer erhält einen 500-Antwortcode und der Inhalt '['.

Log:

2016-08-03 13:37:11.834 DEBUG 5136 --- [nio-8080-exec-3] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/alerts] 
2016-08-03 13:37:11.835 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /alerts 
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public org.reactivestreams.Publisher<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsController.getAlerts()] 
2016-08-03 13:37:11.836 DEBUG 5136 --- [nio-8080-exec-3] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsController' 

Warum können wir OnNext() mehrere Male nicht aufrufen? Wie können wir das tun?

HINWEIS: ich, wenn nur onNext rufen, sobald es ok funktioniert:

@Override 
public void subscribe(Subscriber<? super String> s) { 
    s.onNext("my message" + Math.random()); 
    s.onComplete(); 
} 

oder

@Override 
public void subscribe(Subscriber<? super Alert> s) { 
    s.onNext(new Alert((long)1, "ms")); 
    s.onComplete(); 
} 
+0

Was passiert, wenn Sie etwas einfacheres zurückgeben - EG 'Flux.just (neues SensorRead (sensorId, Math.random()));' – Will

+0

In diesem Fall funktioniert es gut. – codependent

Antwort

4

Meine Publisher Implementierung hat die reaktive Ströme spec nicht folgen, diese So habe ich es behoben:

@GetMapping(value="/strings", produces="text/event-stream") 
public Publisher<String> getStrings(){ 
    return new Publisher<String>() { 

     private int loops = 5; 

     @Override 
     public void subscribe(Subscriber<? super String> s) { 

      s.onSubscribe(new Subscription() { 
       @Override 
       public void request(long n) { 
        for (int i = 0; i < n; i++) { 
         if(loops-- > 0){ 
          try { 
           Thread.sleep(1000); 
          } catch (InterruptedException e) { 
           e.printStackTrace(); 
          } 
          s.onNext("message"+Math.random());       
         }else{ 
          s.onComplete(); 
         } 
        } 
       } 
       @Override 
       public void cancel() { 
        loops = 0; 
       } 
      }); 
     } 
    }; 
} 

Wenn Sie mehr darüber erfahren möchten, werfen Sie einen Blick auf die issue I opened in Spring's JIRA und den hilfreichen Kommentar, den ich dort bekommen habe.

1

Ich habe dies noch nicht getestet, da ich ein bisschen damit beschäftigt bin - tun später, wenn das nicht funktioniert sorry! :)

Aus den obigen Kommentaren sieht es aus wie das Problem mit Ihrer Flux-Erstellung ist.

Ich nehme an, dass Spring Reactive Controller in der Lage sind, einen Flux zu behandeln, der mehrere ausgibt, ohne dass dies über WebSockets oder SSE geschieht. Wieder werde ich ein bisschen später spielen.

Flux hat viele statische Methoden für den Aufbau, die Ihnen hier helfen werden.

Wie wäre es die folgende Art und Weise tun:

return Flux.intervalMillis(1000) 
.map(l -> new new SensorRead(sensorId, Math.random())); 

Aber dies wird Ihnen einen nie Strom endet, der vielleicht nicht, was Sie wollen.

Die andere Option ist so etwas wie dieses:

return Flux.range(1, 5) //Spit out 5 values starting from 1 
.delayMillis(1000) //Delay the onNext calls to separate 1 second apart 
.map(l -> new new SensorRead(sensorId, Math.random())); 

aktualisieren

OK, so dass diese Frage hat sich ganz erheblich verändert.

In Antwort auf "Warum können wir OnNext() nicht mehrmals aufrufen? Wie können wir das tun?"

Natürlich habe ich nicht die API so die Argumentation, die ich nicht beantworten kann, aber IMO gibt es eine Zweideutigkeit und Komplexität, wie man die multiple Emissionen in der miriad auf verschiedene Weise umgehen könnte

HTTP 1.1 erlaubt nicht mehrere Antworten pro Anfrage, so die einzige gültige Option ist einige collect in eine Liste oder Low-Level schreiben Sie die onNext zum Ausgangsstrom für jede Emission - beide haben Komplexitäten um Inhaltstyp (EG XML vs JSON)

Dies ist noch komplizierter, wenn wir HTTP2, WebSockets und SSE einbringen, die jede Form von Mehrfachantworten pro Anfrage ausführen können - wiederum jedes needi ng unterschiedlich behandelt werden.

Wenn Sie in der Lage sein möchten, mehrere Emissionen zu machen, dann müssen Sie sich WebSockets oder SSE anschauen.

Das Spring-Reactive-Projekt hat SSE-Klassen und sieht so aus, als wäre es implementiert.

EG

@RequestMapping("/sse/event") 
    Flux<SseEvent> sse() { 
     return Flux.interval(Duration.ofMillis(100)).map(l -> { 
      SseEvent event = new SseEvent(); 
      event.setId(Long.toString(l)); 
      event.setData("foo"); 
      event.setComment("bar"); 
      return event; 
     }).take(2); 
    } 

Hier finden Sie unten für weitere Beispiele:

https://github.com/spring-projects/spring-reactive/blob/master/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

hoffe, das hilft

+0

Vielen Dank für Ihren Vorschlag, es funktioniert definitiv so. Jedenfalls würde ich es begrüßen, wenn jemand aufklären könnte, warum wir nicht einfach 'Publisher script (s)' implementieren und 's.onNext()' aufrufen können. – codependent

+0

Meine ganze Erfahrung ist mit 'RxJava', und ich habe keine Erfahrung mit' Reaktor' Entschuldigung (obwohl ich gerade anfange, mich selbst zu unterrichten), also kann nicht sofort kommentieren, aber werde schauen. Im Allgemeinen ist das Konstruieren mit den statischen Methoden einfacher und in 'RxJava' ist der 'Observable'-Konstruktor geschützt, so dass Sie ihn nicht direkt konstruieren können. – Will

+0

ups Ich habe dein Update verpasst, danke für deine Antwort. Ihr Beispiel mit dem Endpoint "/ sse/event" + Flux funktioniert einwandfrei. Allerdings löst es meine Zweifel nicht: Spring Reactive folgt der reaktiven Streams-Spezifikation, daher sollte es möglich sein, die Publisher-Schnittstelle zu verwenden, und daher 's.onNext()' mehrere Male aufrufen zu können. Ihrem Rat folgend, fügte ich auch 'products =" text/event-stream "' in meine Anfrage-Mapping ein und es gibt immer noch dieselbe Ausnahme :-( – codependent