2016-07-05 24 views
1

Ich versuche, ein Diagramm innerhalb eines Schauspielers zu materialisieren. Dies scheint zu funktionieren, wenn eine der folgenden Bedingungen erfüllt sind:Materialisieren eines Graphen innerhalb eines Schauspielers

  1. Der Graph enthält keine Broadcast (erstellt mit alsoTo) oder
  2. Das gleiche ActorMaterializer für jeden Materialisierung verwendet wird, oder
  3. Der Graph außerhalb eines Actor

materialisiert ich es reduziert den folgenden Testfälle nach unten:

import java.util.concurrent.{CountDownLatch, TimeUnit} 

import akka.NotUsed 
import akka.actor.{Actor, ActorSystem} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{RunnableGraph, Sink, Source} 
import akka.testkit.{TestActorRef, TestKit} 

import org.scalatest.{FlatSpecLike, Matchers} 

class ActorFlowTest extends TestKit(ActorSystem("ActorFlowTest")) with Matchers with FlatSpecLike { 

    def createGraph(withBroadcast: Boolean) = { 
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    else Source.empty.to(Sink.ignore) 
    } 

    case object Bomb 

    class FlowActor(
    graph: RunnableGraph[NotUsed], 
    latch: CountDownLatch, 
    materializer: (ActorSystem) => ActorMaterializer 
) extends Actor { 

    override def preStart(): Unit = { 
     graph.run()(materializer(context.system)) 
     latch.countDown() 
    } 

    override def receive: Receive = { 
     case Bomb => throw new RuntimeException 
    } 
    } 

    "Without an actor" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    val materializer1 = ActorMaterializer()(system) 
    val materializer2 = ActorMaterializer()(system) 
    graph.run()(materializer1) 
    graph.run()(materializer2) // Pass 
    } 

    "With a the same materializer" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    val materializer = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, _ => materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer but no broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = false) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new FlowActor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Fail 
    } 

    def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = { 
    actorRef.start() 
    actorRef ! Bomb 
    latch.await(25, TimeUnit.SECONDS) 
    } 
} 

Es scheint, dass die letzten Fälle mit dem folgenden Fehler immer in der Log-Timeout werden:

[ERROR] [07/05/2016 16:06:30.625] [ActorFlowTest-akka.actor.default-dispatcher-6] [akka://ActorFlowTest/user/$$c] Futures timed out after [20000 milliseconds] 
akka.actor.PostRestartException: akka://ActorFlowTest/user/$$c: exception post restart (class java.lang.RuntimeException) 
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:250) 
    at akka.actor.dungeon.FaultHandling$$anonfun$6.apply(FaultHandling.scala:248) 
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:303) 
    at akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1.applyOrElse(FaultHandling.scala:298) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) 
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:248) 
    at akka.actor.dungeon.FaultHandling$class.faultRecreate(FaultHandling.scala:76) 
    at akka.actor.ActorCell.faultRecreate(ActorCell.scala:374) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:464) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
    at akka.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:243) 
    at akka.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283) 
    at akka.testkit.CallingThreadDispatcher.systemDispatch(CallingThreadDispatcher.scala:191) 
    at akka.actor.dungeon.Dispatch$class.restart(Dispatch.scala:119) 
    at akka.actor.ActorCell.restart(ActorCell.scala:374) 
    at akka.actor.LocalActorRef.restart(ActorRef.scala:406) 
    at akka.actor.SupervisorStrategy.restartChild(FaultHandling.scala:365) 
    at akka.actor.OneForOneStrategy.processFailure(FaultHandling.scala:518) 
    at akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:303) 
    at akka.actor.dungeon.FaultHandling$class.handleFailure(FaultHandling.scala:263) 
    at akka.actor.ActorCell.handleFailure(ActorCell.scala:374) 
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) 
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:223) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) 
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:167) 
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) 
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:165) 
    at scala.concurrent.Await$.result(package.scala:190) 
    at akka.stream.impl.ActorMaterializerImpl.actorOf(ActorMaterializerImpl.scala:207) 
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.matGraph(ActorMaterializerImpl.scala:166) 
    at akka.stream.impl.ActorMaterializerImpl$$anon$2.materializeAtomic(ActorMaterializerImpl.scala:150) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:919) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915) 
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:94) 
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:922) 
    at akka.stream.impl.MaterializerSession$$anonfun$materializeModule$1.apply(StreamLayout.scala:915) 
    at scala.collection.immutable.Set$Set4.foreach(Set.scala:200) 
    at akka.stream.impl.MaterializerSession.materializeModule(StreamLayout.scala:915) 
    at akka.stream.impl.MaterializerSession.materialize(StreamLayout.scala:882) 
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:182) 
    at akka.stream.impl.ActorMaterializerImpl.materialize(ActorMaterializerImpl.scala:80) 
    at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:351) 
    at ActorFlowTest$FlowActor.preStart(ActorFlowTest.scala:40) 
    at akka.actor.Actor$class.postRestart(Actor.scala:566) 
    at ActorFlowTest$FlowActor.postRestart(ActorFlowTest.scala:33) 
    at akka.actor.Actor$class.aroundPostRestart(Actor.scala:504) 
    at ActorFlowTest$FlowActor.aroundPostRestart(ActorFlowTest.scala:33) 
    at akka.actor.dungeon.FaultHandling$class.finishRecreate(FaultHandling.scala:239) 
    ... 25 more 

ich ausdrücklich die ActorMaterializers beendet versucht habe, aber dass das Problem nicht reproduzieren.

Eine Abhilfe ist, einen Verschluss um die ActorMaterializer im Props zu schaffen, aber wenn dies auch von einer anderen Actor kam ich mache mir Sorgen, ich schließlich ähnliche Probleme bekommen.

Irgendeine Idee, warum das ist? Offensichtlich hat es etwas mit der ActorMaterializer zu tun, aber interessant, wie das Entfernen der Sendung es auch löst (sogar mit einem viel komplizierteren Graphen).

+0

Können Sie versuchen, postRestart zu überschreiben und zu validieren, was dort passiert (prüfen Sie eingehende Ausnahme und sehen, ob der Code es erreicht)? Vergessen Sie auch nicht, preStart aufzurufen, wenn Sie postRestart überschreiben. – thwiegan

+0

Die 'RuntimeException', die in den' receive' geworfen wird, ist der 'Grund' in' postRestart'. Die 'TimeoutException' wird ausgelöst, wenn' postRestart' 'preStart' aufruft und' graph.run' aufgerufen wird. – Steiny

Antwort

1

Dies scheint im Zusammenhang zu stehen (oder zumindest durch eine ordnungsgemäße) Aufsicht zu lösen. Ich habe einen extra Supervisor -Actor erstellt, der zu Demonstrationszwecken nur einen einzigen FlowActor in seiner preStart Funktion startet und die Bomb Nachrichten an ihn weiterleitet. Die folgenden Tests werden erfolgreich ohne Zeitüberschreitung ausgeführt:

import java.util.concurrent.{CountDownLatch, TimeUnit} 

import akka.NotUsed 
import akka.actor.Actor.Receive 
import akka.actor.SupervisorStrategy._ 
import akka.actor.{Actor, ActorRef, ActorSystem, OneForOneStrategy, Props} 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{RunnableGraph, Sink, Source} 
import akka.testkit.{TestActorRef, TestKit} 
import org.scalatest.{FlatSpecLike, Matchers} 

import scala.concurrent.duration._ 

class ActorFlowTest extends TestKit(ActorSystem("TrikloSystem")) with Matchers with FlatSpecLike { 

    def createGraph(withBroadcast: Boolean) = { 
    if (withBroadcast) Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    else Source.empty.to(Sink.ignore) 
    } 

    case object Bomb 

    class Supervisor(graph: RunnableGraph[NotUsed], 
        latch: CountDownLatch, 
        materializer: (ActorSystem) => ActorMaterializer) extends Actor { 

    var actorRef: Option[ActorRef] = None 

    override def preStart(): Unit = { 
     actorRef = Some(context.actorOf(Props(new FlowActor(graph, latch, materializer)))) 
    } 

    override def receive: Receive = { 
     case Bomb => actorRef.map(_ ! Bomb) 
    } 
    } 

    class FlowActor(
        graph: RunnableGraph[NotUsed], 
        latch: CountDownLatch, 
        materializer: (ActorSystem) => ActorMaterializer 
       ) extends Actor { 

    override def preStart(): Unit = { 
     graph.run()(materializer(context.system)) 
     latch.countDown() 
    } 

    override def receive: Receive = { 
     case Bomb => 
     throw new RuntimeException 
    } 
    } 

    "Without an actor" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    val materializer1 = ActorMaterializer()(system) 
    val materializer2 = ActorMaterializer()(system) 
    graph.run()(materializer1) 
    graph.run()(materializer2) // Pass 
    } 

    "With a the same materializer" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    val materializer = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, _ => materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer but no broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = false) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Pass 
    } 

    "With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = createGraph(withBroadcast = true) 
    val latch = new CountDownLatch(2) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef = TestActorRef(new Supervisor(graph, latch, materializer)) 
    verify(actorRef, latch) should be(true) // Fail 
    } 

    def verify(actorRef: TestActorRef[_], latch: CountDownLatch): Boolean = { 
    actorRef.start() 
    actorRef ! Bomb 
    latch.await(25, TimeUnit.SECONDS) 
    } 
} 
+0

Dies ist nützlich, aber es scheint nur zur Liste der Vorbehalte hinzuzufügen. Warum sollte der User Guardian Actor anders sein als der Supervisor? Mit anderen Worten, wie ändert dies (oder eine der anderen Vorbehalte) wirklich das Verhalten? – Steiny

0

In diesem Test gibt es einige Fehlfunktionen des Akka TestKit.

TestActorRef ist ein ganz besonderes Test-Konstrukt, dass es auf dem aufrufenden Thread ausgeführt wird (CallingThreadDispatcher), um einfache synchrone Einheitstests zu ermöglichen. Die Verwendung von CountDownLatch in einem synchronen Test ist seltsam, da jede Aktion auf demselben Thread ausgeführt wird, so dass keine Kommunikation zwischen Threads erforderlich ist.

Wenn Sie eine Instanz von TestActorRef erstellen, wird sie in demselben Aufruf gestartet (Sie können dies sehen, indem Sie zum Beispiel eine Exception vom Konstruktor oder preStart werfen und sehen, dass sie in Ihrem Testfall endet).

Aufruf an der ActorRef ist definitiv nicht etwas, was Sie tun sollten, TestActorRef s besondere Art gibt Ihnen Zugriff darauf, aber Sie nennen im Wesentlichen Start auf einem leeren Shell-Schauspieler, und nicht der Schauspieler Sie denken, Sie interagieren mit (und wenn es dieser Schauspieler wäre, wäre es immer noch falsch, start() darauf zu nennen).

Ein richtig (aber nicht sehr nützlich, da es kein Problem ist ein Diagramm zweimal unabhängig vom Kontext oder Materialisierer materialisiert) Test, was Sie beabsichtigen Test zu wiederholen, ohne die Klinke würde und in etwa so aussehen:

class FlowActor(graph: RunnableGraph[NotUsed], materializer: (ActorSystem) => ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    graph.run()(materializer(context.system)) 
    } 
    override def receive: Receive = Actor.emptyBehavior 
} 

"With a new materializer and a broadcast" should "be able to materialize twice" in { 
    val graph = Source.empty.alsoTo(Sink.ignore).to(Sink.ignore) 
    def materializer(system: ActorSystem) = ActorMaterializer()(system) 
    val actorRef1 = TestActorRef(new FlowActor(graph, materializer)) 
    val actorRef2 = TestActorRef(new FlowActor(graph, materializer)) 
    // we'd get an exception here if it was not possible to materialize 
    // since pre-start is run on the calling thread - the same thread 
    // that is executing the test case 
} 

Ich würde nur die spezifischen Seltsamkeiten von diesem gehen, anstatt tiefer in die Magie in TestActorRef zu graben, wird es hart verdient Einsichten sein und sie werden in vielen Fällen nicht anwendbar sein, aber diese spezifische.