2016-06-09 9 views
3

Ich habe Websocket-Client mit akka-http WebSocket Server verbunden, wie kann ich zuhören Verbindung schließen Ereignisse auf dem Server passiert (db Server Shutdown/Server geschlossen Websocket-Verbindung)?Wie WebSocket Server schließen Ereignisse mit akka-http WebSocket Client

object Client extends App { 


    implicit val actorSystem = ActorSystem("akka-system") 
    implicit val flowMaterializer = ActorMaterializer() 

    val config = actorSystem.settings.config 
    val interface = config.getString("app.interface") 

    val port = config.getInt("app.port") 


    // print each incoming strict text message 
    val printSink: Sink[Message, Future[Done]] = 
    Sink.foreach { 
     case message: TextMessage.Strict => 
     println(message.text) 

     case _ => { 
     sourceQueue.map(q => { 
      println(s"offering message on client") 
      q.offer(TextMessage("received unknown")) 
     }) 
     println(s"received unknown message format") 
     } 
    } 

    val (source, sourceQueue) = { 
    val p = Promise[SourceQueue[Message]] 
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.backpressure).mapMaterializedValue(m => { 
     p.trySuccess(m) 
     m 
    }) 
     .keepAlive(FiniteDuration(1, TimeUnit.SECONDS),() => TextMessage.Strict("Heart Beat")) 
    (s, p.future) 
    } 

    val flow = 
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.right) 


    val (upgradeResponse, sourceClose) = 
    Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/ws-echo"), flow) 

    val connected = upgradeResponse.map { upgrade => 
    // just like a regular http request we can get 404 NotFound, 
    // with a response body, that will be available from upgrade.response 
    if (upgrade.response.status == StatusCodes.SwitchingProtocols || upgrade.response.status == StatusCodes.SwitchingProtocols) { 
     Done 
    } else { 
     throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") 
    } 
    } 


    connected.onComplete(println) 

} 

Antwort

3

Websocket Verbindungsabbruch als regulärer Strom Abschluss modelliert, also in Ihrem Fall, dass Sie die materialisierte Future[Done] von Sink.foreach ergab verwenden:

val flow = Flow.fromSinkAndSourceMat(printSink, source)(Keep.both) 

val (upgradeResponse, (sinkClose, sourceClose)) = 
    Http().singleWebSocketRequest(..., flow) 

sinkClose.onComplete { 
    case Success(_) => println("Connection closed gracefully") 
    case Failure(e) => println("Connection closed with an error: $e") 
} 
+0

danke :), wie kann ich wieder (versuchen) nah dran? – invariant

+0

Das hängt von Ihrer Architektur ab. Der einfachste Weg wäre, glaube ich, sowohl 'singleWebSocketRequest()' Aufruf und 'sinkClose.onComplete' Handler in eine Methode zu extrahieren und diese Methode rekursiv vom' onComplete' Handler aufzurufen. –

+1

ok ty, ich wünschte, sie unterstützen diese out of the box wie socket.io http://stackoverflow.com/questions/13797262/how-to-reconnect-to-websocket-after-close-connection – invariant