2013-10-20 4 views
5

Ich habe ein Beispiel, das ich mit Play Framework 2.2.0-Scala erstellen, die WebSockets verwendet, um Daten an einen Client zu streamen. Das Problem, das ich habe, ist, dass aus irgendeinem Grund, eines der Kinder eines übergeordneten Schauspielers nicht ordnungsgemäß heruntergefahren wird. Alle Protokolle zeigen an, dass es angehalten wird und heruntergefahren wurde, aber ich sehe, dass es nicht wirklich down ist, indem es Daten veröffentlicht. Hier einige Code ist, zuerst mit meinem Controller-Aktion:Scala: Akka-Schauspieler stirbt nicht in Play Framework 2.2.0

def scores(teamIds: String) = WebSocket.async[JsValue] { request => 
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el => 
     s"nfl-streaming-scores-${el}" 
    } 

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr))) 
    ScoresStream.join(scoresStream) 
    } 

Also jedes Mal, wenn ein Client eine Verbindung herstellt, sie ScoresStream beitreten, die den jeweiligen Iteratee zurückgibt, Enumerator WebSocket.async erfordert. Die tatsächliche ScoresStream Objekt sieht wie folgt aus:

object ScoresStream { 

    implicit val timeout = Timeout(5 seconds) 

    def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = { 

    (scoresStream ? BeginStreaming).map { 

     case Connected(enumerator) => 
     val iteratee = Iteratee.foreach[JsValue] { _ => 
      Logger.info("Ignore iteratee input.") 
     }.map { _ => 
      Logger.info("Client quitting - killing Actor.") 
      scoresStream ! UnsubscribeAll 
      scoresStream ! PoisonPill 
     } 
     (iteratee,enumerator) 
} 

Die Idee hier ist der Hauptdarsteller zu töten, ScoresStream, wenn der Client die Verbindung trennt. Ich mache das mit scoresStream ! PoisonPill.

ScoresStream wiederum schafft Pub und Sub Instanzen, die Wrapper sind, die für die Veröffentlichung/Anreißen, um Nachrichten Redis verbinden, hier ist der Schauspieler Code:

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging { 

    val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue] 

    case class Message(kind: String, user: String, message: String) 
    implicit val messageReads = Json.reads[Message] 
    implicit val messageWrites = Json.writes[Message] 

    val sub = context.child("sub") match { 
    case None => createSub(scoresChannel) 
    case Some(c) => c 
    } 

    val pub = context.child("pub") match { 
    case None  => createPub(teamIds) 
    case Some(c) => c 
    } 

    def receive = { 
    case BeginStreaming => { 
     log.info("hitting join...") 
     sub ! RegisterCallback 
     sub ! SubscribeChannel(teamIds) 
     sender ! Connected(scoresEnumerator) 
    } 

    case UnsubscribeAll => { 
     sub ! UnsubscribeChannel(teamIds) 
    } 
    } 

} 

trait CreatePubSub { self:Actor => 
    def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub") 
    def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub") 
} 

Schließlich ist hier die eigentliche Sub Schauspieler Code: (Pub doesn ‚t hier relevant erscheinen, wie es fein wird heruntergefahren):

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging { 
    val s = context.child("subscriber") match { 
    case None => createSubscriber 
    case Some(c) => c 
    } 

    def callback(pubsub: PubSubMessage) = pubsub match { 
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") 
    case S(channel, no) => println("subscribed to " + channel + " and count = " + no) 
    case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) 
    case M(channel, msg) => 
     msg match { 
     // exit will unsubscribe from all channels and stop subscription service 
     case "exit" => 
      println("unsubscribe all ..") 
      pChannel.end 
      r.unsubscribe 

     // message "+x" will subscribe to channel x 
     case x if x startsWith "+" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } 
      } 

     // message "-x" will unsubscribe from channel x 
     case x if x startsWith "-" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) 
             pChannel.end 
      } 

     case x => 
     try { 
      log.info("Just got a message: " + x) 
      pChannel.push(Json.parse(x)) 
      } 
      catch { 
      case ex: com.fasterxml.jackson.core.JsonParseException => { 
       log.info("Malformed JSON sent.") 
      } 
      } 
     } 
    } 

    def receive = { 
    case RegisterCallback => { 
     log.info("Creating a subscriber and registering callback") 
     s ! Register(callback) 
    } 
    case SubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("subscribing to channel " + x + " ") } 
     //sub ! Subscribe(Array("scores-5","scores-6")) 
     s ! Subscribe(teamIds) 
    } 
    case UnsubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") } 
     s ! Unsubscribe(teamIds) 
    } 
    case true => println("Subscriber successfully received message.") 
    case false => println("Something went wrong.") 
    } 
} 

trait CreatePublisherSubscriber { self:Actor => 
    def r = new RedisClient("localhost", 6379) 
    def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber") 
    def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher") 
} 

Nun, wenn ein Client eine Verbindung, sehen die Startmeldungen gesund:

012.351.
[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921] 
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991] 
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started ([email protected]) 
Subscriber successfully received message. 
Subscriber successfully received message. 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862] 
subscribed to nfl-streaming-scores-5 and count = 1 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539] 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join... 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514] 

und Trennen sieht gesund:

[info] application - Client quitting - killing Actor. 
unsubscribed from nfl-streaming-scores-5 and count = 0 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters]) 
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped 
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped 

Und hier ist das Problem, nachdem die Client-Verbindung beendet hat, werde ich eine Nachricht senden, dass die zur Zeit Shutdown Schauspieler abonniert wurde:

redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}" 

und hier ist es angezeigt, wenn es nicht sein sollte, sollte dieser Schauspieler technisch tot sein. Andere Akteure, die vorher schon da waren, erhalten die Nachricht ebenfalls, die mit $ a/$ b gekennzeichnet sind. Ich kann bestätigen, dass keine anderen Clients verbunden sind.

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"} 

Was ist auch ein seltsamer Indikator ist, dass Adressnamen nie wieder verwendet werden. Ich sehe immer wieder ein Trend wie die folgenden Namen laichen, wenn ich trennen/verbinden:

akka://application/user/$c 
akka://application/user/$d 
akka://application/user/$e 

nie sehen alte Referenzen wieder gewöhnen.

Meine Annahme hier ist, dass die Verbindung zu Redis nicht sauber geschlossen wird. Es erklärt nicht, warum Logs sagen, dass der Actor aufgehört hat, aber immer noch existiert, aber ich sehe auf jeden Fall Verbindungen, die nach der Ausführung von netstat wieder hergestellt werden, auch nachdem alle Actors vermutlich tot sind. Wenn ich die Anwendung vollständig aus dem Laufen lasse, löschen diese Verbindungen. Es ist so, als würde die Abmeldung stillschweigend fehlschlagen, und das hält den Actor am Leben und auch die Verbindung, die aus mehreren Gründen wirklich schlecht ist, weil das System schließlich keine Dateideskriptoren mehr hat und/oder Speicherlecks aufweist. Ist hier etwas offensichtlich, dass ich falsch mache?

+1

'def r = new RedisClient', ich denke, faul val wäre besser, so dass Sie nur eine Instanz des RedisClient erstellen, anstatt jedes Mal eine neue Instanz zu erstellen, wenn 'r.doSomeThing' aufgerufen wird. – Schleichardt

+0

Ich habe das versucht. Ich habe es sogar aus dem Trait und direkt in den Actor verschoben. Trotzdem ist das Endergebnis dasselbe. mit 'redis-cli' sehe ich auch den UNSUBSCRIBE. Die Verbindung ist jedoch immer noch hergestellt. Die Art und Weise, wie ich Schauspieler hier aufstelle/niederreiße, finde ich mühsam. – randombits

+0

Automatisch erzeugte Darstellernamen werden niemals wiederverwendet. Dies ist der sicherste Weg, sie eindeutig zu machen. Ich vermute, dass der Subscriber Actor (den Sie nicht anzeigen) den Callback an die Redis-Client-Bibliothek weitergibt, wo er ausgeführt wird, wenn Nachrichten ankommen - unabhängig vom Actor. –

Antwort

3

Nur weil Sie den Actor stoppen, heißt das nicht, dass Ressourcen, die diesem Actor gehören, automatisch bereinigt werden.Wenn eine RedisClient mit dieser Actor-Instanz verknüpft ist und diese Verbindung angehalten werden muss, um ordnungsgemäß bereinigt zu werden, sollten Sie in der Methode postStop so etwas tun. Ich stimme auch mit @Schleichardt darin überein, dass Sie Ihre def r = new RedisClient in ein val oder ein lazy val ändern sollten (abhängig von der Reihenfolge der Initialisierung und den Bedürfnissen). Auf diese Weise wissen Sie, dass pro Teilnehmerinstanz nur eine einzige RedisClient zu bereinigen ist. Ich kenne nicht die API für die RedisClient, die Sie verwenden, aber sagen wir mal, es hat eine shutdown Methode, die ihre Verbindung beendet und ihre Ressourcen aufräumt. Dann können Sie einfach einen postStop an den Teilnehmer Schauspieler hinzufügen wie folgt:

override def postStop { 
    r.shutdown 
} 

Angenommen Sie haben die def zu val Änderung vornehmen, könnte das sein, was Sie suchen.